Updated configuration layout
[collectd.git] / src / ceph.c
1 /**
2  * collectd - src/ceph.c
3  * Copyright (C) 2011  New Dream Network
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Colin McCabe <cmccabe@alumni.cmu.edu>
20  *   Dennis Zou <yunzou@cisco.com>
21  *   Dan Ryder <daryder@cisco.com>
22  **/
23
24 #define _BSD_SOURCE
25
26 #include "collectd.h"
27 #include "common.h"
28 #include "plugin.h"
29
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <json/json.h>
34 #include <json/json_object_private.h> /* need for struct json_object_iter */
35 #include <limits.h>
36 #include <poll.h>
37 #include <stdint.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <strings.h>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 #include <sys/types.h>
45 #include <sys/un.h>
46 #include <unistd.h>
47 #define MAX_RRD_DS_NAME_LEN 20
48
49 #define RETRY_ON_EINTR(ret, expr) \
50         while(1) { \
51                 ret = expr; \
52                 if (ret >= 0) \
53                         break; \
54                 ret = -errno; \
55                 if (ret != -EINTR) \
56                         break; \
57         }
58
59 /** Timeout interval in seconds */
60 #define CEPH_TIMEOUT_INTERVAL 1
61
62 /** Maximum path length for a UNIX domain socket on this system */
63 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
64
65 /******* ceph_daemon *******/
66 struct ceph_daemon
67 {
68         /** Version of the admin_socket interface */
69         uint32_t version;
70         /** daemon name **/
71         char name[DATA_MAX_NAME_LEN];
72
73         int dset_num;
74
75         /** Path to the socket that we use to talk to the ceph daemon */
76         char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
77
78         /** The set of  key/value pairs that this daemon reports
79          * dset.type            The daemon name
80          * dset.ds_num          Number of data sources (key/value pairs) 
81          * dset.ds              Dynamically allocated array of key/value pairs
82          */
83         //struct data_set_s dset;
84         /** Dynamically allocated array **/
85         struct data_set_s *dset;
86         int **pc_types;
87 };
88
89 enum perfcounter_type_d
90 {
91         PERFCOUNTER_LONGRUNAVG = 0x4, PERFCOUNTER_COUNTER = 0x8,
92 };
93
94 /** Array of daemons to monitor */
95 static struct ceph_daemon **g_daemons = NULL;
96
97 /** Number of elements in g_daemons */
98 static int g_num_daemons = 0;
99
100 static void ceph_daemon_print(const struct ceph_daemon *d)
101 {
102         DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
103 }
104
105 static void ceph_daemons_print(void)
106 {
107         int i;
108         for (i = 0; i < g_num_daemons; ++i)
109         {
110                 ceph_daemon_print(g_daemons[i]);
111         }
112 }
113
114 /*static void ceph_daemon_free(struct ceph_daemon *d)
115  {
116  plugin_unregister_data_set(d->dset.type);
117  sfree(d->dset.ds);
118  sfree(d);
119  }*/
120 static void ceph_daemon_free(struct ceph_daemon *d)
121 {
122         int i = 0;
123         for (; i < d->dset_num; i++)
124         {
125                 plugin_unregister_data_set((d->dset + i)->type);
126                 sfree(d->dset->ds);
127                 sfree(d->pc_types[i]);
128         }
129         sfree(d->dset);
130         sfree(d->pc_types);
131         sfree(d);
132 }
133
134 static void compact_ds_name(char *source, char *dest)
135 {
136         int keys_num = 0, i;
137         char *save_ptr = NULL, *tmp_ptr = source;
138         char *keys[16];
139         char len_str[3];
140         char tmp[DATA_MAX_NAME_LEN];
141         int reserved = 0;
142         int offset = 0;
143         memset(tmp, 0, sizeof(tmp));
144         if (source == NULL || dest == NULL || source[0] == '\0' || dest[0] != '\0')
145         {
146                 return;
147         }
148         size_t src_len = strlen(source);
149         snprintf(len_str, sizeof(len_str), "%zu", src_len);
150         unsigned char append_status = 0x0;
151         append_status |= (source[src_len - 1] == '-') ? 0x1 : 0x0;
152         append_status |= (source[src_len - 1] == '+') ? 0x2 : 0x0;
153         while ((keys[keys_num] = strtok_r(tmp_ptr, ":_-+", &save_ptr)) != NULL)
154         {
155                 tmp_ptr = NULL;
156                 /** capitalize 1st char **/
157                 keys[keys_num][0] = toupper(keys[keys_num][0]);
158                 keys_num++;
159                 if (keys_num >= 16)
160                         break;
161         }
162         /** concatenate each part of source string **/
163         for (i = 0; i < keys_num; i++)
164         {
165                 strcat(tmp, keys[i]);
166         }
167         tmp[DATA_MAX_NAME_LEN - 1] = '\0';
168         /** to coordinate limitation of length of ds name from RRD
169          *  we will truncate ds_name
170          *  when the its length is more than
171          *  MAX_RRD_DS_NAME_LEN
172          */
173         if (strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
174         {
175                 append_status |= 0x4;
176                 /** we should reserve space for
177                  * len_str
178                  */
179                 reserved += 2;
180         }
181         if (append_status & 0x1)
182         {
183                 /** we should reserve space for
184                  * "Minus"
185                  */
186                 reserved += 5;
187         }
188         if (append_status & 0x2)
189         {
190                 /** we should reserve space for
191                  * "Plus"
192                  */
193                 reserved += 4;
194         }
195         snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
196         offset = strlen(dest);
197         switch (append_status)
198         {
199         case 0x1:
200                 memcpy(dest + offset, "Minus", 5);
201                 break;
202         case 0x2:
203                 memcpy(dest + offset, "Plus", 5);
204                 break;
205         case 0x4:
206                 memcpy(dest + offset, len_str, 2);
207                 break;
208         case 0x5:
209                 memcpy(dest + offset, "Minus", 5);
210                 memcpy(dest + offset + 5, len_str, 2);
211                 break;
212         case 0x6:
213                 memcpy(dest + offset, "Plus", 4);
214                 memcpy(dest + offset + 4, len_str, 2);
215                 break;
216         default:
217                 break;
218         }
219 }
220 static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
221 {
222         char *ptr, *rptr;
223         size_t dset_name_len = 0;
224         size_t ds_name_len = 0;
225         char tmp_ds_name[DATA_MAX_NAME_LEN];
226         memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
227         if (dset_name == NULL || ds_name == NULL || key_str == NULL
228                         || key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
229         {
230                 return -1;
231         }
232         if ((ptr = strchr(key_str, '.')) == NULL
233                         || (rptr = strrchr(key_str, '.')) == NULL)
234         {
235                 strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
236                 strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
237                 goto compact;
238         }
239         dset_name_len =
240                         (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
241                                         (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
242         memcpy(dset_name, key_str, dset_name_len);
243         ds_name_len =
244                         (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
245         if (ds_name_len == 0)
246         { /** only have two keys **/
247                 if (!strncmp(rptr + 1, "type", 4))
248                 {/** if last key is "type",ignore **/
249                         strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
250                 }
251                 else
252                 {/** if last key isn't "type", copy last key **/
253                         strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
254                 }
255         }
256         else if (!strncmp(rptr + 1, "type", 4))
257         {/** more than two keys **/
258                 memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
259         }
260         else
261         {/** copy whole keys **/
262                 strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
263         }
264         compact: compact_ds_name(tmp_ds_name, ds_name);
265         return 0;
266 }
267
268 int get_matching_dset(const struct ceph_daemon *d, const char *name)
269 {
270         int idx;
271         for (idx = 0; idx < d->dset_num; ++idx)
272         {
273                 if (strcmp(d->dset[idx].type, name) == 0)
274                 {
275                         return idx;
276                 }
277         }
278         return -1;
279 }
280
281 int get_matching_value(const struct data_set_s *dset, const char *name,
282                 int num_values)
283 {
284         int idx;
285         for (idx = 0; idx < num_values; ++idx)
286         {
287                 if (strcmp(dset->ds[idx].name, name) == 0)
288                 {
289                         return idx;
290                 }
291         }
292         return -1;
293 }
294
295 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
296                 int pc_type)
297 {
298         struct data_source_s *ds;
299         struct data_set_s *dset;
300         struct data_set_s *dset_array;
301         int **pc_types_array = NULL;
302         int *pc_types;
303         int *pc_types_new;
304         int idx = 0;
305         if (strlen(name) + 1 > DATA_MAX_NAME_LEN)
306                 return -ENAMETOOLONG;
307         char dset_name[DATA_MAX_NAME_LEN];
308         char ds_name[MAX_RRD_DS_NAME_LEN];
309         memset(dset_name, 0, sizeof(dset_name));
310         memset(ds_name, 0, sizeof(ds_name));
311         if (parse_keys(name, dset_name, ds_name))
312                 return 1;
313         idx = get_matching_dset(d, dset_name);
314         if (idx == -1)
315         {/* need to add a dset **/
316                 dset_array = realloc(d->dset,
317                                 sizeof(struct data_set_s) * (d->dset_num + 1));
318                 if (!dset_array)
319                         return -ENOMEM;
320                 pc_types_array = realloc(d->pc_types,
321                                 sizeof(int *) * (d->dset_num + 1));
322                 if (!pc_types_array)
323                         return -ENOMEM;
324                 dset = &dset_array[d->dset_num];
325                 /** this step is very important, otherwise,
326                  *  realloc for dset->ds will tricky because of
327                  *  a random addr in dset->ds
328                  */
329                 memset(dset, 0, sizeof(struct data_set_s));
330                 dset->ds_num = 0;
331                 snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
332                 pc_types = pc_types_array[d->dset_num] = NULL;
333                 d->dset = dset_array;
334         }
335         else
336         {
337                 dset = &d->dset[idx];
338                 pc_types = d->pc_types[idx];
339         }
340         struct data_source_s *ds_array = realloc(dset->ds,
341                         sizeof(struct data_source_s) * (dset->ds_num + 1));
342         if (!ds_array)
343         {
344                 return -ENOMEM;
345         }
346         pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
347         if (!pc_types_new)
348         {
349                 return -ENOMEM;
350         }
351         dset->ds = ds_array;
352         if (idx == -1)
353         {
354                 pc_types_array[d->dset_num] = pc_types_new;
355                 d->pc_types = pc_types_array;
356                 d->pc_types[d->dset_num][dset->ds_num] = pc_type;
357                 d->dset_num++;
358         }
359         else
360         {
361                 d->pc_types[idx] = pc_types_new;
362                 d->pc_types[idx][dset->ds_num] = pc_type;
363         }
364         ds = &ds_array[dset->ds_num++];
365         snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
366         ds->type =
367                         (pc_type & PERFCOUNTER_COUNTER) ? DS_TYPE_COUNTER : DS_TYPE_GAUGE;
368         ds->min = NAN;
369         ds->max = NAN;
370         return 0;
371 }
372
373 /******* ceph_config *******/
374 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
375 {
376         const char *val;
377         if (item->values_num != 1)
378         {
379                 return -ENOTSUP;
380         }
381         if (item->values[0].type != OCONFIG_TYPE_STRING)
382         {
383                 return -ENOTSUP;
384         }
385         val = item->values[0].value.string;
386         if (snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
387         {
388                 ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
389                                 item->key);
390                 return -ENAMETOOLONG;
391         }
392         return 0;
393 }
394
395 static int cc_add_daemon_config(oconfig_item_t *ci)
396 {
397         int ret, i;
398         struct ceph_daemon *array, *nd, cd;
399         memset(&cd, 0, sizeof(struct ceph_daemon));
400
401         if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
402         {
403                 WARNING("ceph plugin: `Daemon' blocks need exactly one string argument.");
404                 return (-1);
405         }
406
407         ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
408         if (ret)
409                 return ret;
410
411         for (i=0; i < ci->children_num; i++)
412         {
413                 oconfig_item_t *child = ci->children + i;
414
415                 if (strcasecmp("SocketPath", child->key) == 0)
416                 {
417                         ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
418                         if (ret)
419                                 return ret;
420                 }
421                 else
422                 {
423                         WARNING("ceph plugin: ignoring unknown option %s", child->key);
424                 }
425         }
426         if (cd.name[0] == '\0')
427         {
428                 ERROR("ceph plugin: you must configure a daemon name.\n");
429                 return -EINVAL;
430         }
431         else if (cd.asok_path[0] == '\0')
432         {
433                 ERROR("ceph plugin(name=%s): you must configure an administrative "
434                 "socket path.\n", cd.name);
435                 return -EINVAL;
436         }
437         else if (!((cd.asok_path[0] == '/')
438                         || (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
439         {
440                 ERROR("ceph plugin(name=%s): administrative socket paths must begin with "
441                                 "'/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
442                 return -EINVAL;
443         }
444         array = realloc(g_daemons,
445                         sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
446         if (array == NULL)
447         {
448                 /* The positive return value here indicates that this is a
449                  * runtime error, not a configuration error.  */
450                 return ENOMEM;
451         }
452         g_daemons = (struct ceph_daemon**) array;
453         nd = malloc(sizeof(struct ceph_daemon));
454         if (!nd)
455                 return ENOMEM;
456         memcpy(nd, &cd, sizeof(struct ceph_daemon));
457         g_daemons[g_num_daemons++] = nd;
458         return 0;
459 }
460
461 static int ceph_config(oconfig_item_t *ci)
462 {
463         int ret, i;
464
465         for (i = 0; i < ci->children_num; ++i)
466         {
467                 oconfig_item_t *child = ci->children + i;
468                 if (strcasecmp("Daemon", child->key) == 0)
469                 {
470                         ret = cc_add_daemon_config(child);
471                         if (ret)
472                                 return ret;
473                 }
474                 else
475                 {
476                         WARNING("ceph plugin: ignoring unknown option %s", child->key);
477                 }
478         }
479         return 0;
480 }
481
482 /******* JSON parsing *******/
483 typedef int (*node_handler_t)(void*, json_object*, const char*);
484
485 /** Perform a depth-first traversal of the JSON parse tree,
486  * calling node_handler at each node.*/
487 static int traverse_json_impl(json_object *jo, char *key, int max_key,
488                 node_handler_t handler, void *handler_arg)
489 {
490         struct json_object_iter iter;
491         int ret, plen, klen;
492
493         if (json_object_get_type(jo) != json_type_object)
494                 return 0;
495         plen = strlen(key);
496         json_object_object_foreachC(jo, iter)
497         {
498                 klen = strlen(iter.key);
499                 if (plen + klen + 2 > max_key)
500                         return -ENAMETOOLONG;
501                 if (plen != 0)
502                         strncat(key, ".", max_key); /* really should be strcat */
503                 strncat(key, iter.key, max_key);
504
505                 ret = handler(handler_arg, iter.val, key);
506                 if (ret == 1)
507                 {
508                         ret = traverse_json_impl(iter.val, key, max_key, handler,
509                                         handler_arg);
510                 }
511                 else if (ret != 0)
512                 {
513                         return ret;
514                 }
515
516                 key[plen] = '\0';
517         }
518         return 0;
519 }
520
521 static int traverse_json(const char *json, node_handler_t handler,
522                 void *handler_arg)
523 {
524         json_object *root;
525         char buf[128];
526         buf[0] = '\0';
527         root = json_tokener_parse(json);
528         if (!root)
529                 return -EDOM;
530         int result = traverse_json_impl(root, buf, sizeof(buf), handler, handler_arg);
531         json_object_put(root);
532         return result;
533 }
534
535 static int node_handler_define_schema(void *arg, json_object *jo,
536                 const char *key)
537 {
538         struct ceph_daemon *d = (struct ceph_daemon *) arg;
539         int pc_type;
540         if (json_object_get_type(jo) == json_type_object)
541                 return 1;
542         else if (json_object_get_type(jo) != json_type_int)
543                 return -EDOM;
544         pc_type = json_object_get_int(jo);
545         DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
546                         d->name, key, pc_type);
547         return ceph_daemon_add_ds_entry(d, key, pc_type);
548 }
549 struct values_holder
550 {
551         int values_len;
552         value_t *values;
553 };
554
555 /** A set of values_t data that we build up in memory while parsing the JSON. */
556 struct values_tmp
557 {
558         struct ceph_daemon *d;
559         int holder_num;
560         struct values_holder vh[0];
561 };
562
563 static int node_handler_fetch_data(void *arg, json_object *jo, const char *key)
564 {
565         int dset_idx, ds_idx;
566         value_t *uv;
567         char dset_name[DATA_MAX_NAME_LEN];
568         char ds_name[MAX_RRD_DS_NAME_LEN];
569         struct values_tmp *vtmp = (struct values_tmp*) arg;
570         memset(dset_name, 0, sizeof(dset_name));
571         memset(ds_name, 0, sizeof(ds_name));
572         if (parse_keys(key, dset_name, ds_name))
573                 return 1;DEBUG("enter node_handler_fetch_data");
574         dset_idx = get_matching_dset(vtmp->d, dset_name);
575         if (dset_idx == -1)
576                 return 1;
577         ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
578                         vtmp->d->dset[dset_idx].ds_num);
579         if (ds_idx == -1)
580                 return 1;DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
581                         dset_name,ds_name,dset_idx,ds_idx);
582         uv = &(vtmp->vh[dset_idx].values[ds_idx]);
583         if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LONGRUNAVG)
584         {
585                 json_object *avgcount, *sum;
586                 uint64_t avgcounti;
587                 double sumd;
588                 if (json_object_get_type(jo) != json_type_object)
589                         return -EINVAL;
590                 avgcount = json_object_object_get(jo, "avgcount");
591                 sum = json_object_object_get(jo, "sum");
592                 if ((!avgcount) || (!sum))
593                         return -EINVAL;
594                 avgcounti = json_object_get_int(avgcount);
595                 DEBUG("avgcounti:%ld",avgcounti);
596                 if (avgcounti == 0)
597                         avgcounti = 1;
598                 sumd = json_object_get_int(sum);
599                 DEBUG("sumd:%lf",sumd);
600                 uv->gauge = sumd / avgcounti;
601                 DEBUG("uv->gauge = sumd / avgcounti = :%lf",uv->gauge);
602         }
603         else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_COUNTER)
604         {
605                 /* We use json_object_get_double here because anything > 32 
606                  * bits may get truncated by json_object_get_int */
607                 uv->counter = json_object_get_double(jo);
608                 DEBUG("uv->counter %ld",(long)uv->counter);
609         }
610         else
611         {
612                 uv->gauge = json_object_get_double(jo);
613                 DEBUG("uv->gauge %lf",uv->gauge);
614         }
615         return 0;
616 }
617
618 /******* network I/O *******/
619 enum cstate_t
620 {
621         CSTATE_UNCONNECTED = 0,
622         CSTATE_WRITE_REQUEST,
623         CSTATE_READ_VERSION,
624         CSTATE_READ_AMT,
625         CSTATE_READ_JSON,
626 };
627
628 enum request_type_t
629 {
630         ASOK_REQ_VERSION = 0,
631         ASOK_REQ_DATA = 1,
632         ASOK_REQ_SCHEMA = 2,
633         ASOK_REQ_NONE = 1000,
634 };
635
636 struct cconn
637 {
638         /** The Ceph daemon that we're talking to */
639         struct ceph_daemon *d;
640
641         /** Request type */
642         uint32_t request_type;
643
644         /** The connection state */
645         enum cstate_t state;
646
647         /** The socket we use to talk to this daemon */
648         int asok;
649
650         /** The amount of data remaining to read / write. */
651         uint32_t amt;
652
653         /** Length of the JSON to read */
654         uint32_t json_len;
655
656         /** Buffer containing JSON data */
657         char *json;
658 };
659
660 static int cconn_connect(struct cconn *io)
661 {
662         struct sockaddr_un address;
663         int flags, fd, err;
664         if (io->state != CSTATE_UNCONNECTED)
665         {
666                 ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED");
667                 return -EDOM;
668         }
669         fd = socket(PF_UNIX, SOCK_STREAM, 0);
670         if (fd < 0)
671         {
672                 int err = -errno;
673                 ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: "
674                 "error %d", err);
675                 return err;
676         }
677         memset(&address, 0, sizeof(struct sockaddr_un));
678         address.sun_family = AF_UNIX;
679         snprintf(address.sun_path, sizeof(address.sun_path), "%s",
680                         io->d->asok_path);
681         RETRY_ON_EINTR(err,
682                 connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
683         if (err < 0)
684         {
685                 ERROR("cconn_connect: connect(%d) failed: error %d", fd, err);
686                 return err;
687         }
688
689         flags = fcntl(fd, F_GETFL, 0);
690         if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
691         {
692                 err = -errno;
693                 ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err);
694                 return err;
695         }
696         io->asok = fd;
697         io->state = CSTATE_WRITE_REQUEST;
698         io->amt = 0;
699         io->json_len = 0;
700         io->json = NULL;
701         return 0;
702 }
703
704 static void cconn_close(struct cconn *io)
705 {
706         io->state = CSTATE_UNCONNECTED;
707         if (io->asok != -1)
708         {
709                 int res;
710                 RETRY_ON_EINTR(res, close(io->asok));
711         }
712         io->asok = -1;
713         io->amt = 0;
714         io->json_len = 0;
715         sfree(io->json);
716         io->json = NULL;
717 }
718
719 /* Process incoming JSON counter data */
720 /*static int cconn_process_data(struct cconn *io)
721  {
722  int ret;
723  value_list_t vl = VALUE_LIST_INIT;
724  struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) +
725  (sizeof(value_t) * io->d->dset.ds_num));
726  if (!vtmp)
727  return -ENOMEM;
728  vtmp->d = io->d;
729  vtmp->values_len = io->d->dset.ds_num;
730  ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
731  if (ret)
732  goto done;
733  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
734  sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
735  sstrncpy(vl.type, io->d->dset.type, sizeof(vl.type));
736  vl.values = vtmp->values;
737  vl.values_len = vtmp->values_len;
738  DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
739  io->d->dset.type, vl.values_len, io->json);
740  ret = plugin_dispatch_values(&vl);
741  done:
742  sfree(vtmp);
743  return ret;
744  }*/
745 static int cconn_process_data(struct cconn *io)
746 {
747         int i, ret = 0;
748         struct values_tmp *vtmp = calloc(1,
749                         sizeof(struct values_tmp)
750                                         + (sizeof(struct values_holder)) * io->d->dset_num);
751         if (!vtmp)
752                 return -ENOMEM;
753         for (i = 0; i < io->d->dset_num; i++)
754         {
755                 value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
756                 vtmp->vh[i].values = val;
757                 vtmp->vh[i].values_len = io->d->dset[i].ds_num;
758         }
759         vtmp->d = io->d;
760         vtmp->holder_num = io->d->dset_num;
761         ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
762         if (ret)
763                 goto done;
764         for (i = 0; i < vtmp->holder_num; i++)
765         {
766                 value_list_t vl = VALUE_LIST_INIT;
767                 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
768                 sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
769                 strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
770                 sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
771                 vl.values = vtmp->vh[i].values;
772                 vl.values_len = vtmp->vh[i].values_len;
773                 DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
774                                 io->d->name, vl.values_len, io->json);
775                 ret = plugin_dispatch_values(&vl);
776                 if (ret)
777                         goto done;
778         }
779
780         done: for (i = 0; i < vtmp->holder_num; i++)
781         {
782                 sfree(vtmp->vh[i].values);
783         }
784         sfree(vtmp);
785         return ret;
786 }
787
788 static int cconn_process_json(struct cconn *io)
789 {
790         switch (io->request_type)
791         {
792         case ASOK_REQ_DATA:
793                 return cconn_process_data(io);
794         case ASOK_REQ_SCHEMA:
795                 return traverse_json(io->json, node_handler_define_schema, io->d);
796         default:
797                 return -EDOM;
798         }
799 }
800
801 static int cconn_validate_revents(struct cconn *io, int revents)
802 {
803         if (revents & POLLERR)
804         {
805                 ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name);
806                 return -EIO;
807         }
808         switch (io->state)
809         {
810         case CSTATE_WRITE_REQUEST:
811                 return (revents & POLLOUT) ? 0 : -EINVAL;
812         case CSTATE_READ_VERSION:
813         case CSTATE_READ_AMT:
814         case CSTATE_READ_JSON:
815                 return (revents & POLLIN) ? 0 : -EINVAL;
816                 return (revents & POLLIN) ? 0 : -EINVAL;
817         default:
818                 ERROR("cconn_validate_revents(name=%s) got to illegal state on line %d",
819                                 io->d->name, __LINE__);
820                 return -EDOM;
821         }
822 }
823
824 /** Handle a network event for a connection */
825 static int cconn_handle_event(struct cconn *io)
826 {
827         int ret;
828         switch (io->state)
829         {
830         case CSTATE_UNCONNECTED:
831                 ERROR("cconn_handle_event(name=%s) got to illegal state on line %d",
832                                 io->d->name, __LINE__);
833
834                 return -EDOM;
835         case CSTATE_WRITE_REQUEST:
836         {
837                 char cmd[32];
838                 /*snprintf(cmd, sizeof(cmd), "%s%d%s", "{\"prefix\":\"", io->request_type,
839                  "\"}");*/
840                 char req_type_str[2];
841                 snprintf(req_type_str, sizeof(req_type_str), "%1.1d", io->request_type);
842                 json_object *cmd_object = json_object_new_object();
843                 json_object_object_add(cmd_object, "prefix",
844                                 json_object_new_string(req_type_str));
845                 const char *cmd_json = json_object_to_json_string(cmd_object);
846                 /** we should send '\n' to server **/
847                 snprintf(cmd, sizeof(cmd), "%s\n", cmd_json);
848                 size_t cmd_len = strlen(cmd);
849                 RETRY_ON_EINTR(ret,
850                                 write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
851                 DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
852                                 io->d->name, io->state, io->amt, ret);
853                 if (ret < 0)
854                         return ret;
855                 io->amt += ret;
856                 if (io->amt >= cmd_len)
857                 {
858                         io->amt = 0;
859                         switch (io->request_type)
860                         {
861                         case ASOK_REQ_VERSION:
862                                 io->state = CSTATE_READ_VERSION;
863                                 break;
864                         default:
865                                 io->state = CSTATE_READ_AMT;
866                                 break;
867                         }
868                 }
869                 json_object_put(cmd_object);
870                 return 0;
871         }
872         case CSTATE_READ_VERSION:
873         {
874                 RETRY_ON_EINTR(ret,
875                                 read(io->asok, ((char*)(&io->d->version)) + io->amt,
876                                                 sizeof(io->d->version) - io->amt));
877                 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
878                                 io->d->name, io->state, ret);
879                 if (ret < 0)
880                         return ret;
881                 io->amt += ret;
882                 if (io->amt >= sizeof(io->d->version))
883                 {
884                         io->d->version = ntohl(io->d->version);
885                         if (io->d->version != 1)
886                         {
887                                 ERROR("cconn_handle_event(name=%s) not "
888                                 "expecting version %d!", io->d->name, io->d->version);
889                                 return -ENOTSUP;
890                         }DEBUG("cconn_handle_event(name=%s): identified as "
891                                         "version %d", io->d->name, io->d->version);
892                         io->amt = 0;
893                         cconn_close(io);
894                         io->request_type = ASOK_REQ_SCHEMA;
895                 }
896                 return 0;
897         }
898         case CSTATE_READ_AMT:
899         {
900                 RETRY_ON_EINTR(ret,
901                                 read(io->asok, ((char*)(&io->json_len)) + io->amt,
902                                                 sizeof(io->json_len) - io->amt));
903                 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
904                                 io->d->name, io->state, ret);
905                 if (ret < 0)
906                         return ret;
907                 io->amt += ret;
908                 if (io->amt >= sizeof(io->json_len))
909                 {
910                         io->json_len = ntohl(io->json_len);
911                         io->amt = 0;
912                         io->state = CSTATE_READ_JSON;
913                         io->json = calloc(1, io->json_len + 1);
914                         if (!io->json)
915                                 return -ENOMEM;
916                 }
917                 return 0;
918         }
919         case CSTATE_READ_JSON:
920         {
921                 RETRY_ON_EINTR(ret,
922                                 read(io->asok, io->json + io->amt, io->json_len - io->amt));
923                 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
924                                 io->d->name, io->state, ret);
925                 if (ret < 0)
926                         return ret;
927                 io->amt += ret;
928                 if (io->amt >= io->json_len)
929                 {
930                         ret = cconn_process_json(io);
931                         if (ret)
932                                 return ret;
933                         cconn_close(io);
934                         io->request_type = ASOK_REQ_NONE;
935                 }
936                 return 0;
937         }
938         default:
939                 ERROR("cconn_handle_event(name=%s) got to illegal state on "
940                 "line %d", io->d->name, __LINE__);
941                 return -EDOM;
942         }
943 }
944
945 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
946 {
947         int ret;
948         if (io->request_type == ASOK_REQ_NONE)
949         {
950                 /* The request has already been serviced. */
951                 return 0;
952         }
953         else if ((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
954         {
955                 /* If there are no counters to report on, don't bother
956                  * connecting */
957                 return 0;
958         }
959
960         switch (io->state)
961         {
962         case CSTATE_UNCONNECTED:
963                 ret = cconn_connect(io);
964                 if (ret > 0)
965                         return -ret;
966                 else if (ret < 0)
967                         return ret;
968                 fds->fd = io->asok;
969                 fds->events = POLLOUT;
970                 return 1;
971         case CSTATE_WRITE_REQUEST:
972                 fds->fd = io->asok;
973                 fds->events = POLLOUT;
974                 return 1;
975         case CSTATE_READ_VERSION:
976         case CSTATE_READ_AMT:
977         case CSTATE_READ_JSON:
978                 fds->fd = io->asok;
979                 fds->events = POLLIN;
980                 return 1;
981         default:
982                 ERROR("cconn_prepare(name=%s) got to illegal state on line %d",
983                                 io->d->name, __LINE__);
984                 return -EDOM;
985         }
986 }
987
988 /** Returns the difference between two struct timevals in milliseconds.
989  * On overflow, we return max/min int.
990  */
991 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
992 {
993         int64_t ret;
994         int sec_diff = t1->tv_sec - t2->tv_sec;
995         int usec_diff = t1->tv_usec - t2->tv_usec;
996         ret = usec_diff / 1000;
997         ret += (sec_diff * 1000);
998         if (ret > INT_MAX)
999                 return INT_MAX;
1000         else if (ret < INT_MIN)
1001                 return INT_MIN;
1002         return (int) ret;
1003 }
1004
1005 /** This handles the actual network I/O to talk to the Ceph daemons.
1006  */
1007 static int cconn_main_loop(uint32_t request_type)
1008 {
1009         int i, ret, some_unreachable = 0;
1010         struct timeval end_tv;
1011         struct cconn io_array[g_num_daemons];
1012
1013         DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
1014
1015         /* create cconn array */
1016         memset(io_array, 0, sizeof(io_array));
1017         for (i = 0; i < g_num_daemons; ++i)
1018         {
1019                 io_array[i].d = g_daemons[i];
1020                 io_array[i].request_type = request_type;
1021                 io_array[i].state = CSTATE_UNCONNECTED;
1022         }
1023
1024         /** Calculate the time at which we should give up */
1025         gettimeofday(&end_tv, NULL);
1026         end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1027
1028         while (1)
1029         {
1030                 int nfds, diff;
1031                 struct timeval tv;
1032                 struct cconn *polled_io_array[g_num_daemons];
1033                 struct pollfd fds[g_num_daemons];
1034                 memset(fds, 0, sizeof(fds));
1035                 nfds = 0;
1036                 for (i = 0; i < g_num_daemons; ++i)
1037                 {
1038                         struct cconn *io = io_array + i;
1039                         ret = cconn_prepare(io, fds + nfds);
1040                         if (ret < 0)
1041                         {
1042                                 WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1043                                                 io->d->name, i, io->state, ret);
1044                                 cconn_close(io);
1045                                 io->request_type = ASOK_REQ_NONE;
1046                                 some_unreachable = 1;
1047                         }
1048                         else if (ret == 1)
1049                         {
1050                                 DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
1051                                                 io->d->name, i, io->state);
1052                                 polled_io_array[nfds++] = io_array + i;
1053                         }
1054                 }
1055                 if (nfds == 0)
1056                 {
1057                         /* finished */
1058                         ret = 0;
1059                         DEBUG("cconn_main_loop: no more cconn to manage.");
1060                         goto done;
1061                 }
1062                 gettimeofday(&tv, NULL);
1063                 diff = milli_diff(&end_tv, &tv);
1064                 if (diff <= 0)
1065                 {
1066                         /* Timed out */
1067                         ret = -ETIMEDOUT;
1068                         WARNING("ERROR: cconn_main_loop: timed out.\n");
1069                         goto done;
1070                 }
1071                 RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1072                 if (ret < 0)
1073                 {
1074                         ERROR("poll(2) error: %d", ret);
1075                         goto done;
1076                 }
1077                 for (i = 0; i < nfds; ++i)
1078                 {
1079                         struct cconn *io = polled_io_array[i];
1080                         int revents = fds[i].revents;
1081                         if (revents == 0)
1082                         {
1083                                 /* do nothing */
1084                         }
1085                         else if (cconn_validate_revents(io, revents))
1086                         {
1087                                 WARNING("ERROR: cconn(name=%s,i=%d,st=%d): "
1088                                 "revents validation error: "
1089                                 "revents=0x%08x", io->d->name, i, io->state, revents);
1090                                 cconn_close(io);
1091                                 io->request_type = ASOK_REQ_NONE;
1092                                 some_unreachable = 1;
1093                         }
1094                         else
1095                         {
1096                                 int ret = cconn_handle_event(io);
1097                                 if (ret)
1098                                 {
1099                                         WARNING("ERROR: cconn_handle_event(name=%s,"
1100                                         "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1101                                         cconn_close(io);
1102                                         io->request_type = ASOK_REQ_NONE;
1103                                         some_unreachable = 1;
1104                                 }
1105                         }
1106                 }
1107         }
1108         done: for (i = 0; i < g_num_daemons; ++i)
1109         {
1110                 cconn_close(io_array + i);
1111         }
1112         if (some_unreachable)
1113         {
1114                 DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
1115         }
1116         else
1117         {
1118                 DEBUG("cconn_main_loop: reached all Ceph daemons :)");
1119         }
1120         return ret;
1121 }
1122
1123 static int ceph_read(void)
1124 {
1125         return cconn_main_loop(ASOK_REQ_DATA);
1126 }
1127
1128 /******* lifecycle *******/
1129 static int ceph_init(void)
1130 {
1131         int i, ret, j;
1132         DEBUG("ceph_init");
1133         ceph_daemons_print();
1134
1135         ret = cconn_main_loop(ASOK_REQ_VERSION);
1136         if (ret)
1137                 return ret;
1138         for (i = 0; i < g_num_daemons; ++i)
1139         {
1140                 struct ceph_daemon *d = g_daemons[i];
1141                 for (j = 0; j < d->dset_num; j++)
1142                 {
1143                         ret = plugin_register_data_set(d->dset + j);
1144                         if (ret)
1145                         {
1146                                 ERROR("plugin_register_data_set(%s) failed!", d->name);
1147                         }
1148                         else
1149                         {
1150                                 DEBUG("plugin_register_data_set(%s): "
1151                                                 "(d->dset)[%d]->ds_num=%d",
1152                                                 d->name, j, d->dset[j].ds_num);
1153                         }
1154                 }
1155         }
1156         return 0;
1157 }
1158
1159 static int ceph_shutdown(void)
1160 {
1161         int i;
1162         for (i = 0; i < g_num_daemons; ++i)
1163         {
1164                 ceph_daemon_free(g_daemons[i]);
1165         }
1166         sfree(g_daemons);
1167         g_daemons = NULL;
1168         g_num_daemons = 0;
1169         DEBUG("finished ceph_shutdown");
1170         return 0;
1171 }
1172
1173 void module_register(void)
1174 {
1175         plugin_register_complex_config("ceph", ceph_config);
1176         plugin_register_init("ceph", ceph_init);
1177         plugin_register_read("ceph", ceph_read);
1178         plugin_register_shutdown("ceph", ceph_shutdown);
1179 }