3b2481282237dc30e2288e991c0ccfa0a0b09f2c
[collectd.git] / src / ceph.c
1 /**
2  * collectd - src/ceph.c
3  * Copyright (C) 2011  New Dream Network
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Colin McCabe <cmccabe@alumni.cmu.edu>
20  *   Dennis Zou <yunzou@cisco.com>
21  *   Dan Ryder <daryder@cisco.com>
22  **/
23
24 #define _BSD_SOURCE
25
26 #include "collectd.h"
27 #include "common.h"
28 #include "plugin.h"
29
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <json/json.h>
34 #include <json/json_object_private.h> /* need for struct json_object_iter */
35 #include <limits.h>
36 #include <poll.h>
37 #include <stdint.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <strings.h>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 #include <sys/types.h>
45 #include <sys/un.h>
46 #include <unistd.h>
47 #define MAX_RRD_DS_NAME_LEN 20
48
49 #define RETRY_ON_EINTR(ret, expr) \
50         while(1) { \
51                 ret = expr; \
52                 if (ret >= 0) \
53                         break; \
54                 ret = -errno; \
55                 if (ret != -EINTR) \
56                         break; \
57         }
58
59 /** Timeout interval in seconds */
60 #define CEPH_TIMEOUT_INTERVAL 1
61
62 /** Maximum path length for a UNIX domain socket on this system */
63 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
64
65 /******* ceph_daemon *******/
66 struct ceph_daemon
67 {
68         /** Version of the admin_socket interface */
69         uint32_t version;
70         /** daemon name **/
71         char name[DATA_MAX_NAME_LEN];
72
73         int dset_num;
74
75         /** Path to the socket that we use to talk to the ceph daemon */
76         char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
77
78         /** The set of  key/value pairs that this daemon reports
79          * dset.type            The daemon name
80          * dset.ds_num          Number of data sources (key/value pairs) 
81          * dset.ds              Dynamically allocated array of key/value pairs
82          */
83         //struct data_set_s dset;
84         /** Dynamically allocated array **/
85         struct data_set_s *dset;
86         int **pc_types;
87 };
88
89 enum perfcounter_type_d
90 {
91         PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
92 };
93
94 /** Array of daemons to monitor */
95 static struct ceph_daemon **g_daemons = NULL;
96
97 /** Number of elements in g_daemons */
98 static int g_num_daemons = 0;
99
100 static void ceph_daemon_print(const struct ceph_daemon *d)
101 {
102         DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
103 }
104
105 static void ceph_daemons_print(void)
106 {
107         int i;
108         for (i = 0; i < g_num_daemons; ++i)
109         {
110                 ceph_daemon_print(g_daemons[i]);
111         }
112 }
113
114 struct last_data **last_poll_data = NULL;
115 int last_idx = 0;
116
117 /*static void ceph_daemon_free(struct ceph_daemon *d)
118  {
119  plugin_unregister_data_set(d->dset.type);
120  sfree(d->dset.ds);
121  sfree(d);
122  }*/
123 static void ceph_daemon_free(struct ceph_daemon *d)
124 {
125         int i = 0;
126         for (; i < d->dset_num; i++)
127         {
128                 plugin_unregister_data_set((d->dset + i)->type);
129                 sfree(d->dset->ds);
130                 sfree(d->pc_types[i]);
131         }
132         sfree(d->dset);
133         sfree(d->pc_types);
134         sfree(d);
135 }
136
137 static void compact_ds_name(char *source, char *dest)
138 {
139         int keys_num = 0, i;
140         char *save_ptr = NULL, *tmp_ptr = source;
141         char *keys[16];
142         char len_str[3];
143         char tmp[DATA_MAX_NAME_LEN];
144         int reserved = 0;
145         int offset = 0;
146         memset(tmp, 0, sizeof(tmp));
147         if (source == NULL || dest == NULL || source[0] == '\0' || dest[0] != '\0')
148         {
149                 return;
150         }
151         size_t src_len = strlen(source);
152         snprintf(len_str, sizeof(len_str), "%zu", src_len);
153         unsigned char append_status = 0x0;
154         append_status |= (source[src_len - 1] == '-') ? 0x1 : 0x0;
155         append_status |= (source[src_len - 1] == '+') ? 0x2 : 0x0;
156         while ((keys[keys_num] = strtok_r(tmp_ptr, ":_-+", &save_ptr)) != NULL)
157         {
158                 tmp_ptr = NULL;
159                 /** capitalize 1st char **/
160                 keys[keys_num][0] = toupper(keys[keys_num][0]);
161                 keys_num++;
162                 if (keys_num >= 16)
163                         break;
164         }
165         /** concatenate each part of source string **/
166         for (i = 0; i < keys_num; i++)
167         {
168                 strcat(tmp, keys[i]);
169         }
170         tmp[DATA_MAX_NAME_LEN - 1] = '\0';
171         /** to coordinate limitation of length of ds name from RRD
172          *  we will truncate ds_name
173          *  when the its length is more than
174          *  MAX_RRD_DS_NAME_LEN
175          */
176         if (strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
177         {
178                 append_status |= 0x4;
179                 /** we should reserve space for
180                  * len_str
181                  */
182                 reserved += 2;
183         }
184         if (append_status & 0x1)
185         {
186                 /** we should reserve space for
187                  * "Minus"
188                  */
189                 reserved += 5;
190         }
191         if (append_status & 0x2)
192         {
193                 /** we should reserve space for
194                  * "Plus"
195                  */
196                 reserved += 4;
197         }
198         snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
199         offset = strlen(dest);
200         switch (append_status)
201         {
202         case 0x1:
203                 memcpy(dest + offset, "Minus", 5);
204                 break;
205         case 0x2:
206                 memcpy(dest + offset, "Plus", 5);
207                 break;
208         case 0x4:
209                 memcpy(dest + offset, len_str, 2);
210                 break;
211         case 0x5:
212                 memcpy(dest + offset, "Minus", 5);
213                 memcpy(dest + offset + 5, len_str, 2);
214                 break;
215         case 0x6:
216                 memcpy(dest + offset, "Plus", 4);
217                 memcpy(dest + offset + 4, len_str, 2);
218                 break;
219         default:
220                 break;
221         }
222 }
223 static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
224 {
225         char *ptr, *rptr;
226         size_t dset_name_len = 0;
227         size_t ds_name_len = 0;
228         char tmp_ds_name[DATA_MAX_NAME_LEN];
229         memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
230         if (dset_name == NULL || ds_name == NULL || key_str == NULL
231                         || key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
232         {
233                 return -1;
234         }
235         if ((ptr = strchr(key_str, '.')) == NULL
236                         || (rptr = strrchr(key_str, '.')) == NULL)
237         {
238                 strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
239                 strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
240                 goto compact;
241         }
242         dset_name_len =
243                         (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
244                                         (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
245         memcpy(dset_name, key_str, dset_name_len);
246         ds_name_len =
247                         (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
248         if (ds_name_len == 0)
249         { /** only have two keys **/
250                 if (!strncmp(rptr + 1, "type", 4))
251                 {/** if last key is "type",ignore **/
252                         strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
253                 }
254                 else
255                 {/** if last key isn't "type", copy last key **/
256                         strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
257                 }
258         }
259         else if (!strncmp(rptr + 1, "type", 4))
260         {/** more than two keys **/
261                 memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
262         }
263         else
264         {/** copy whole keys **/
265                 strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
266         }
267         compact: compact_ds_name(tmp_ds_name, ds_name);
268         return 0;
269 }
270
271 int get_matching_dset(const struct ceph_daemon *d, const char *name)
272 {
273         int idx;
274         for (idx = 0; idx < d->dset_num; ++idx)
275         {
276                 if (strcmp(d->dset[idx].type, name) == 0)
277                 {
278                         return idx;
279                 }
280         }
281         return -1;
282 }
283
284 int get_matching_value(const struct data_set_s *dset, const char *name,
285                 int num_values)
286 {
287         int idx;
288         for (idx = 0; idx < num_values; ++idx)
289         {
290                 if (strcmp(dset->ds[idx].name, name) == 0)
291                 {
292                         return idx;
293                 }
294         }
295         return -1;
296 }
297
298 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
299                 int pc_type)
300 {
301         struct data_source_s *ds;
302         struct data_set_s *dset;
303         struct data_set_s *dset_array;
304         int **pc_types_array = NULL;
305         int *pc_types;
306         int *pc_types_new;
307         int idx = 0;
308         if (strlen(name) + 1 > DATA_MAX_NAME_LEN)
309                 return -ENAMETOOLONG;
310         char dset_name[DATA_MAX_NAME_LEN];
311         char ds_name[MAX_RRD_DS_NAME_LEN];
312         memset(dset_name, 0, sizeof(dset_name));
313         memset(ds_name, 0, sizeof(ds_name));
314         if (parse_keys(name, dset_name, ds_name))
315                 return 1;
316         idx = get_matching_dset(d, dset_name);
317         if (idx == -1)
318         {/* need to add a dset **/
319                 dset_array = realloc(d->dset,
320                                 sizeof(struct data_set_s) * (d->dset_num + 1));
321                 if (!dset_array)
322                         return -ENOMEM;
323                 pc_types_array = realloc(d->pc_types,
324                                 sizeof(int *) * (d->dset_num + 1));
325                 if (!pc_types_array)
326                         return -ENOMEM;
327                 dset = &dset_array[d->dset_num];
328                 /** this step is very important, otherwise,
329                  *  realloc for dset->ds will tricky because of
330                  *  a random addr in dset->ds
331                  */
332                 memset(dset, 0, sizeof(struct data_set_s));
333                 dset->ds_num = 0;
334                 snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
335                 pc_types = pc_types_array[d->dset_num] = NULL;
336                 d->dset = dset_array;
337         }
338         else
339         {
340                 dset = &d->dset[idx];
341                 pc_types = d->pc_types[idx];
342         }
343         struct data_source_s *ds_array = realloc(dset->ds,
344                         sizeof(struct data_source_s) * (dset->ds_num + 1));
345         if (!ds_array)
346         {
347                 return -ENOMEM;
348         }
349         pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
350         if (!pc_types_new)
351         {
352                 return -ENOMEM;
353         }
354         dset->ds = ds_array;
355         if (idx == -1)
356         {
357                 pc_types_array[d->dset_num] = pc_types_new;
358                 d->pc_types = pc_types_array;
359                 d->pc_types[d->dset_num][dset->ds_num] = pc_type;
360                 d->dset_num++;
361         }
362         else
363         {
364                 d->pc_types[idx] = pc_types_new;
365                 d->pc_types[idx][dset->ds_num] = pc_type;
366         }
367         ds = &ds_array[dset->ds_num++];
368         snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
369         ds->type =
370                         (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE;
371                         
372          /** Special case for filestore:JournalWrBytes, we don't want to
373             use what the Ceph schema gives us (sum/count pair) */
374         if((strcmp(dset_name,"filestore") == 0) &&
375            (strcmp(ds_name,"JournalWrBytes") == 0))
376         {
377                 ds->type = DS_TYPE_DERIVE;
378         }
379         /** Use min of 0 for DERIVE types so we dont' get negative values
380             on Ceph service restart */
381         if(ds->type == DS_TYPE_DERIVE)
382         {
383                 ds->min = 0;
384         }
385         else if(ds->type == DS_TYPE_GAUGE)
386         {
387                 ds->min = NAN;
388         }
389
390         ds->max = NAN;
391         return 0;
392 }
393
394 /******* ceph_config *******/
395 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
396 {
397         const char *val;
398         if (item->values_num != 1)
399         {
400                 return -ENOTSUP;
401         }
402         if (item->values[0].type != OCONFIG_TYPE_STRING)
403         {
404                 return -ENOTSUP;
405         }
406         val = item->values[0].value.string;
407         if (snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
408         {
409                 ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
410                                 item->key);
411                 return -ENAMETOOLONG;
412         }
413         return 0;
414 }
415
416 static int cc_add_daemon_config(oconfig_item_t *ci)
417 {
418         int ret, i;
419         struct ceph_daemon *array, *nd, cd;
420         memset(&cd, 0, sizeof(struct ceph_daemon));
421
422         if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
423         {
424                 WARNING("ceph plugin: `Daemon' blocks need exactly one string argument.");
425                 return (-1);
426         }
427
428         ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
429         if (ret)
430                 return ret;
431
432         for (i=0; i < ci->children_num; i++)
433         {
434                 oconfig_item_t *child = ci->children + i;
435
436                 if (strcasecmp("SocketPath", child->key) == 0)
437                 {
438                         ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
439                         if (ret)
440                                 return ret;
441                 }
442                 else
443                 {
444                         WARNING("ceph plugin: ignoring unknown option %s", child->key);
445                 }
446         }
447         if (cd.name[0] == '\0')
448         {
449                 ERROR("ceph plugin: you must configure a daemon name.\n");
450                 return -EINVAL;
451         }
452         else if (cd.asok_path[0] == '\0')
453         {
454                 ERROR("ceph plugin(name=%s): you must configure an administrative "
455                 "socket path.\n", cd.name);
456                 return -EINVAL;
457         }
458         else if (!((cd.asok_path[0] == '/')
459                         || (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
460         {
461                 ERROR("ceph plugin(name=%s): administrative socket paths must begin with "
462                                 "'/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
463                 return -EINVAL;
464         }
465         array = realloc(g_daemons,
466                         sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
467         if (array == NULL)
468         {
469                 /* The positive return value here indicates that this is a
470                  * runtime error, not a configuration error.  */
471                 return ENOMEM;
472         }
473         g_daemons = (struct ceph_daemon**) array;
474         nd = malloc(sizeof(struct ceph_daemon));
475         if (!nd)
476                 return ENOMEM;
477         memcpy(nd, &cd, sizeof(struct ceph_daemon));
478         g_daemons[g_num_daemons++] = nd;
479         return 0;
480 }
481
482 static int ceph_config(oconfig_item_t *ci)
483 {
484         int ret, i;
485
486         for (i = 0; i < ci->children_num; ++i)
487         {
488                 oconfig_item_t *child = ci->children + i;
489                 if (strcasecmp("Daemon", child->key) == 0)
490                 {
491                         ret = cc_add_daemon_config(child);
492                         if (ret)
493                                 return ret;
494                 }
495                 else
496                 {
497                         WARNING("ceph plugin: ignoring unknown option %s", child->key);
498                 }
499         }
500         return 0;
501 }
502
503 /******* JSON parsing *******/
504 typedef int (*node_handler_t)(void*, json_object*, const char*);
505
506 /** Perform a depth-first traversal of the JSON parse tree,
507  * calling node_handler at each node.*/
508 static int traverse_json_impl(json_object *jo, char *key, int max_key,
509                 node_handler_t handler, void *handler_arg)
510 {
511         struct json_object_iter iter;
512         int ret, plen, klen;
513
514         if (json_object_get_type(jo) != json_type_object)
515                 return 0;
516         plen = strlen(key);
517         json_object_object_foreachC(jo, iter)
518         {
519                 klen = strlen(iter.key);
520                 if (plen + klen + 2 > max_key)
521                         return -ENAMETOOLONG;
522                 if (plen != 0)
523                         strncat(key, ".", max_key); /* really should be strcat */
524                 strncat(key, iter.key, max_key);
525
526                 ret = handler(handler_arg, iter.val, key);
527                 if (ret == 1)
528                 {
529                         ret = traverse_json_impl(iter.val, key, max_key, handler,
530                                         handler_arg);
531                 }
532                 else if (ret != 0)
533                 {
534                         return ret;
535                 }
536
537                 key[plen] = '\0';
538         }
539         return 0;
540 }
541
542 static int traverse_json(const char *json, node_handler_t handler,
543                 void *handler_arg)
544 {
545         json_object *root;
546         char buf[128];
547         buf[0] = '\0';
548         root = json_tokener_parse(json);
549         if (!root)
550                 return -EDOM;
551         int result = traverse_json_impl(root, buf, sizeof(buf), handler, handler_arg);
552         json_object_put(root);
553         return result;
554 }
555
556 static int node_handler_define_schema(void *arg, json_object *jo,
557                 const char *key)
558 {
559         struct ceph_daemon *d = (struct ceph_daemon *) arg;
560         int pc_type;
561         if (json_object_get_type(jo) == json_type_object)
562                 return 1;
563         else if (json_object_get_type(jo) != json_type_int)
564                 return -EDOM;
565         pc_type = json_object_get_int(jo);
566         DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
567                         d->name, key, pc_type);
568         return ceph_daemon_add_ds_entry(d, key, pc_type);
569 }
570 struct values_holder
571 {
572         int values_len;
573         value_t *values;
574 };
575
576 /** A set of values_t data that we build up in memory while parsing the JSON. */
577 struct values_tmp
578 {
579         struct ceph_daemon *d;
580         int holder_num;
581         struct values_holder vh[0];
582 };
583
584 struct last_data
585 {
586         char dset_name[DATA_MAX_NAME_LEN];
587         char ds_name[MAX_RRD_DS_NAME_LEN];
588         double last_sum;
589         uint64_t last_count;
590 };
591
592 int add_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
593 {
594         last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
595         if(!last_poll_data[last_idx])
596         {
597                 return ENOMEM;
598         }
599         sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,sizeof(last_poll_data[last_idx]->dset_name));
600         sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,sizeof(last_poll_data[last_idx]->ds_name));
601         last_poll_data[last_idx]->last_sum = cur_sum;
602         last_poll_data[last_idx]->last_count = cur_count;
603         last_idx++;
604         return 1;
605 }
606
607 int update_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
608 {
609         int i;
610         for(i = 0; i < last_idx; i++)
611         {
612                 if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0)
613                 {
614                         if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0)
615                         {
616                                 last_poll_data[i]->last_sum = cur_sum;
617                                 last_poll_data[i]->last_count = cur_count;
618                                 return 1;
619                         }
620                 }
621         }
622
623         if(NULL == last_poll_data)
624         {
625                 last_poll_data = malloc(1 * sizeof(struct last_data *));
626                 if(!last_poll_data)
627                 {
628                         return ENOMEM;
629                 }
630         }
631         else
632         {
633                 struct last_data **tmp_last = realloc(last_poll_data, ((last_idx+1) * sizeof(struct last_data *)));
634                 if(!tmp_last)
635                 {
636                         return ENOMEM;
637                 }
638                 last_poll_data = tmp_last;
639         }
640         add_last(dset_n,ds_n,cur_sum,cur_count);
641         return -1;
642 }
643
644 double get_last_avg(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
645 {
646         int i;
647         double result = -1.1;
648         double sum_delt = 0.0;
649         uint64_t count_delt = 0;
650         for(i = 0; i < last_idx; i++)
651         {
652                 if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0)
653                 {
654                         if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0)
655                         {
656                                 if(cur_count < last_poll_data[i]->last_count)
657                                 {
658                                         break;
659                                 }
660                                 sum_delt = (cur_sum - last_poll_data[i]->last_sum);
661                                 count_delt = (cur_count - last_poll_data[i]->last_count);
662                                 result = (sum_delt / count_delt);
663                                 break;
664                         }
665                 }
666         }
667
668         result = (result == -1.1) ? NAN : result;
669         update_last(dset_n,ds_n,cur_sum,cur_count);
670         return result;
671 }
672
673 static int node_handler_fetch_data(void *arg, json_object *jo, const char *key)
674 {
675         int dset_idx, ds_idx;
676         value_t *uv;
677         char dset_name[DATA_MAX_NAME_LEN];
678         char ds_name[MAX_RRD_DS_NAME_LEN];
679         struct values_tmp *vtmp = (struct values_tmp*) arg;
680         memset(dset_name, 0, sizeof(dset_name));
681         memset(ds_name, 0, sizeof(ds_name));
682         if (parse_keys(key, dset_name, ds_name))
683                 return 1;DEBUG("enter node_handler_fetch_data");
684         dset_idx = get_matching_dset(vtmp->d, dset_name);
685         if (dset_idx == -1)
686                 return 1;
687         ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
688                         vtmp->d->dset[dset_idx].ds_num);
689         if (ds_idx == -1)
690                 return 1;DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
691                         dset_name,ds_name,dset_idx,ds_idx);
692         uv = &(vtmp->vh[dset_idx].values[ds_idx]);
693         
694         /** Special case for filestore:JournalWrBytes, we don't want to
695             use what the Ceph schema gives us */
696         if((strcmp(dset_name,"filestore") == 0) &&
697            (strcmp(ds_name,"JournalWrBytes") == 0))
698         {
699                 json_object *sum;
700                 sum = json_object_object_get(jo, "sum");
701                 if(!sum)
702                         return -EINVAL;
703                 uv->derive = (uint64_t) json_object_get_double(sum);
704                 DEBUG("uv derive = %" PRIu64 "",(uint64_t) uv->derive);
705         }
706         else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY)
707         {
708                 json_object *avgcount, *sum;
709                 uint64_t avgcounti;
710                 double sumd;
711                 if (json_object_get_type(jo) != json_type_object)
712                         return -EINVAL;
713                 avgcount = json_object_object_get(jo, "avgcount");
714                 sum = json_object_object_get(jo, "sum");
715                 if ((!avgcount) || (!sum))
716                         return -EINVAL;
717                 avgcounti = json_object_get_int(avgcount);
718                 DEBUG("avgcounti:%ld",avgcounti);
719                 if (avgcounti == 0)
720                         avgcounti = 1;
721                 sumd = json_object_get_double(sum);
722                 DEBUG("sumd:%lf",sumd);
723                 double last_avg = get_last_avg(dset_name, ds_name, sumd, avgcounti);
724                 uv->gauge = last_avg;
725                 DEBUG("uv->gauge = (sumd_now - sumd_last) / (avgcounti_now - avgcounti_last) = :%lf",uv->gauge);
726         }
727         else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE)
728         {
729                 /* We use json_object_get_double here because anything > 32 
730                  * bits may get truncated by json_object_get_int */
731                 uv->derive = (uint64_t) json_object_get_double(jo);
732                 DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive);
733         }
734         else
735         {
736                 uv->gauge = json_object_get_double(jo);
737                 DEBUG("uv->gauge %lf",uv->gauge);
738         }
739         return 0;
740 }
741
742 /******* network I/O *******/
743 enum cstate_t
744 {
745         CSTATE_UNCONNECTED = 0,
746         CSTATE_WRITE_REQUEST,
747         CSTATE_READ_VERSION,
748         CSTATE_READ_AMT,
749         CSTATE_READ_JSON,
750 };
751
752 enum request_type_t
753 {
754         ASOK_REQ_VERSION = 0,
755         ASOK_REQ_DATA = 1,
756         ASOK_REQ_SCHEMA = 2,
757         ASOK_REQ_NONE = 1000,
758 };
759
760 struct cconn
761 {
762         /** The Ceph daemon that we're talking to */
763         struct ceph_daemon *d;
764
765         /** Request type */
766         uint32_t request_type;
767
768         /** The connection state */
769         enum cstate_t state;
770
771         /** The socket we use to talk to this daemon */
772         int asok;
773
774         /** The amount of data remaining to read / write. */
775         uint32_t amt;
776
777         /** Length of the JSON to read */
778         uint32_t json_len;
779
780         /** Buffer containing JSON data */
781         char *json;
782 };
783
784 static int cconn_connect(struct cconn *io)
785 {
786         struct sockaddr_un address;
787         int flags, fd, err;
788         if (io->state != CSTATE_UNCONNECTED)
789         {
790                 ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED");
791                 return -EDOM;
792         }
793         fd = socket(PF_UNIX, SOCK_STREAM, 0);
794         if (fd < 0)
795         {
796                 int err = -errno;
797                 ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: "
798                 "error %d", err);
799                 return err;
800         }
801         memset(&address, 0, sizeof(struct sockaddr_un));
802         address.sun_family = AF_UNIX;
803         snprintf(address.sun_path, sizeof(address.sun_path), "%s",
804                         io->d->asok_path);
805         RETRY_ON_EINTR(err,
806                 connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
807         if (err < 0)
808         {
809                 ERROR("cconn_connect: connect(%d) failed: error %d", fd, err);
810                 return err;
811         }
812
813         flags = fcntl(fd, F_GETFL, 0);
814         if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
815         {
816                 err = -errno;
817                 ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err);
818                 return err;
819         }
820         io->asok = fd;
821         io->state = CSTATE_WRITE_REQUEST;
822         io->amt = 0;
823         io->json_len = 0;
824         io->json = NULL;
825         return 0;
826 }
827
828 static void cconn_close(struct cconn *io)
829 {
830         io->state = CSTATE_UNCONNECTED;
831         if (io->asok != -1)
832         {
833                 int res;
834                 RETRY_ON_EINTR(res, close(io->asok));
835         }
836         io->asok = -1;
837         io->amt = 0;
838         io->json_len = 0;
839         sfree(io->json);
840         io->json = NULL;
841 }
842
843 /* Process incoming JSON counter data */
844 /*static int cconn_process_data(struct cconn *io)
845  {
846  int ret;
847  value_list_t vl = VALUE_LIST_INIT;
848  struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) +
849  (sizeof(value_t) * io->d->dset.ds_num));
850  if (!vtmp)
851  return -ENOMEM;
852  vtmp->d = io->d;
853  vtmp->values_len = io->d->dset.ds_num;
854  ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
855  if (ret)
856  goto done;
857  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
858  sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
859  sstrncpy(vl.type, io->d->dset.type, sizeof(vl.type));
860  vl.values = vtmp->values;
861  vl.values_len = vtmp->values_len;
862  DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
863  io->d->dset.type, vl.values_len, io->json);
864  ret = plugin_dispatch_values(&vl);
865  done:
866  sfree(vtmp);
867  return ret;
868  }*/
869 static int cconn_process_data(struct cconn *io)
870 {
871         int i, ret = 0;
872         struct values_tmp *vtmp = calloc(1,
873                         sizeof(struct values_tmp)
874                                         + (sizeof(struct values_holder)) * io->d->dset_num);
875         if (!vtmp)
876                 return -ENOMEM;
877         for (i = 0; i < io->d->dset_num; i++)
878         {
879                 value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
880                 vtmp->vh[i].values = val;
881                 vtmp->vh[i].values_len = io->d->dset[i].ds_num;
882         }
883         vtmp->d = io->d;
884         vtmp->holder_num = io->d->dset_num;
885         ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
886         if (ret)
887                 goto done;
888         for (i = 0; i < vtmp->holder_num; i++)
889         {
890                 value_list_t vl = VALUE_LIST_INIT;
891                 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
892                 sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
893                 strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
894                 sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
895                 vl.values = vtmp->vh[i].values;
896                 vl.values_len = vtmp->vh[i].values_len;
897                 DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
898                                 io->d->name, vl.values_len, io->json);
899                 ret = plugin_dispatch_values(&vl);
900                 if (ret)
901                         goto done;
902         }
903
904         done: for (i = 0; i < vtmp->holder_num; i++)
905         {
906                 sfree(vtmp->vh[i].values);
907         }
908         sfree(vtmp);
909         return ret;
910 }
911
912 static int cconn_process_json(struct cconn *io)
913 {
914         switch (io->request_type)
915         {
916         case ASOK_REQ_DATA:
917                 return cconn_process_data(io);
918         case ASOK_REQ_SCHEMA:
919                 return traverse_json(io->json, node_handler_define_schema, io->d);
920         default:
921                 return -EDOM;
922         }
923 }
924
925 static int cconn_validate_revents(struct cconn *io, int revents)
926 {
927         if (revents & POLLERR)
928         {
929                 ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name);
930                 return -EIO;
931         }
932         switch (io->state)
933         {
934         case CSTATE_WRITE_REQUEST:
935                 return (revents & POLLOUT) ? 0 : -EINVAL;
936         case CSTATE_READ_VERSION:
937         case CSTATE_READ_AMT:
938         case CSTATE_READ_JSON:
939                 return (revents & POLLIN) ? 0 : -EINVAL;
940                 return (revents & POLLIN) ? 0 : -EINVAL;
941         default:
942                 ERROR("cconn_validate_revents(name=%s) got to illegal state on line %d",
943                                 io->d->name, __LINE__);
944                 return -EDOM;
945         }
946 }
947
948 /** Handle a network event for a connection */
949 static int cconn_handle_event(struct cconn *io)
950 {
951         int ret;
952         switch (io->state)
953         {
954         case CSTATE_UNCONNECTED:
955                 ERROR("cconn_handle_event(name=%s) got to illegal state on line %d",
956                                 io->d->name, __LINE__);
957
958                 return -EDOM;
959         case CSTATE_WRITE_REQUEST:
960         {
961                 char cmd[32];
962                 /*snprintf(cmd, sizeof(cmd), "%s%d%s", "{\"prefix\":\"", io->request_type,
963                  "\"}");*/
964                 char req_type_str[2];
965                 snprintf(req_type_str, sizeof(req_type_str), "%1.1d", io->request_type);
966                 json_object *cmd_object = json_object_new_object();
967                 json_object_object_add(cmd_object, "prefix",
968                                 json_object_new_string(req_type_str));
969                 const char *cmd_json = json_object_to_json_string(cmd_object);
970                 /** we should send '\n' to server **/
971                 snprintf(cmd, sizeof(cmd), "%s\n", cmd_json);
972                 size_t cmd_len = strlen(cmd);
973                 RETRY_ON_EINTR(ret,
974                                 write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
975                 DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
976                                 io->d->name, io->state, io->amt, ret);
977                 if (ret < 0)
978                         return ret;
979                 io->amt += ret;
980                 if (io->amt >= cmd_len)
981                 {
982                         io->amt = 0;
983                         switch (io->request_type)
984                         {
985                         case ASOK_REQ_VERSION:
986                                 io->state = CSTATE_READ_VERSION;
987                                 break;
988                         default:
989                                 io->state = CSTATE_READ_AMT;
990                                 break;
991                         }
992                 }
993                 json_object_put(cmd_object);
994                 return 0;
995         }
996         case CSTATE_READ_VERSION:
997         {
998                 RETRY_ON_EINTR(ret,
999                                 read(io->asok, ((char*)(&io->d->version)) + io->amt,
1000                                                 sizeof(io->d->version) - io->amt));
1001                 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1002                                 io->d->name, io->state, ret);
1003                 if (ret < 0)
1004                         return ret;
1005                 io->amt += ret;
1006                 if (io->amt >= sizeof(io->d->version))
1007                 {
1008                         io->d->version = ntohl(io->d->version);
1009                         if (io->d->version != 1)
1010                         {
1011                                 ERROR("cconn_handle_event(name=%s) not "
1012                                 "expecting version %d!", io->d->name, io->d->version);
1013                                 return -ENOTSUP;
1014                         }DEBUG("cconn_handle_event(name=%s): identified as "
1015                                         "version %d", io->d->name, io->d->version);
1016                         io->amt = 0;
1017                         cconn_close(io);
1018                         io->request_type = ASOK_REQ_SCHEMA;
1019                 }
1020                 return 0;
1021         }
1022         case CSTATE_READ_AMT:
1023         {
1024                 RETRY_ON_EINTR(ret,
1025                                 read(io->asok, ((char*)(&io->json_len)) + io->amt,
1026                                                 sizeof(io->json_len) - io->amt));
1027                 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1028                                 io->d->name, io->state, ret);
1029                 if (ret < 0)
1030                         return ret;
1031                 io->amt += ret;
1032                 if (io->amt >= sizeof(io->json_len))
1033                 {
1034                         io->json_len = ntohl(io->json_len);
1035                         io->amt = 0;
1036                         io->state = CSTATE_READ_JSON;
1037                         io->json = calloc(1, io->json_len + 1);
1038                         if (!io->json)
1039                                 return -ENOMEM;
1040                 }
1041                 return 0;
1042         }
1043         case CSTATE_READ_JSON:
1044         {
1045                 RETRY_ON_EINTR(ret,
1046                                 read(io->asok, io->json + io->amt, io->json_len - io->amt));
1047                 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1048                                 io->d->name, io->state, ret);
1049                 if (ret < 0)
1050                         return ret;
1051                 io->amt += ret;
1052                 if (io->amt >= io->json_len)
1053                 {
1054                         ret = cconn_process_json(io);
1055                         if (ret)
1056                                 return ret;
1057                         cconn_close(io);
1058                         io->request_type = ASOK_REQ_NONE;
1059                 }
1060                 return 0;
1061         }
1062         default:
1063                 ERROR("cconn_handle_event(name=%s) got to illegal state on "
1064                 "line %d", io->d->name, __LINE__);
1065                 return -EDOM;
1066         }
1067 }
1068
1069 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
1070 {
1071         int ret;
1072         if (io->request_type == ASOK_REQ_NONE)
1073         {
1074                 /* The request has already been serviced. */
1075                 return 0;
1076         }
1077         else if ((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
1078         {
1079                 /* If there are no counters to report on, don't bother
1080                  * connecting */
1081                 return 0;
1082         }
1083
1084         switch (io->state)
1085         {
1086         case CSTATE_UNCONNECTED:
1087                 ret = cconn_connect(io);
1088                 if (ret > 0)
1089                         return -ret;
1090                 else if (ret < 0)
1091                         return ret;
1092                 fds->fd = io->asok;
1093                 fds->events = POLLOUT;
1094                 return 1;
1095         case CSTATE_WRITE_REQUEST:
1096                 fds->fd = io->asok;
1097                 fds->events = POLLOUT;
1098                 return 1;
1099         case CSTATE_READ_VERSION:
1100         case CSTATE_READ_AMT:
1101         case CSTATE_READ_JSON:
1102                 fds->fd = io->asok;
1103                 fds->events = POLLIN;
1104                 return 1;
1105         default:
1106                 ERROR("cconn_prepare(name=%s) got to illegal state on line %d",
1107                                 io->d->name, __LINE__);
1108                 return -EDOM;
1109         }
1110 }
1111
1112 /** Returns the difference between two struct timevals in milliseconds.
1113  * On overflow, we return max/min int.
1114  */
1115 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
1116 {
1117         int64_t ret;
1118         int sec_diff = t1->tv_sec - t2->tv_sec;
1119         int usec_diff = t1->tv_usec - t2->tv_usec;
1120         ret = usec_diff / 1000;
1121         ret += (sec_diff * 1000);
1122         if (ret > INT_MAX)
1123                 return INT_MAX;
1124         else if (ret < INT_MIN)
1125                 return INT_MIN;
1126         return (int) ret;
1127 }
1128
1129 /** This handles the actual network I/O to talk to the Ceph daemons.
1130  */
1131 static int cconn_main_loop(uint32_t request_type)
1132 {
1133         int i, ret, some_unreachable = 0;
1134         struct timeval end_tv;
1135         struct cconn io_array[g_num_daemons];
1136
1137         DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
1138
1139         /* create cconn array */
1140         memset(io_array, 0, sizeof(io_array));
1141         for (i = 0; i < g_num_daemons; ++i)
1142         {
1143                 io_array[i].d = g_daemons[i];
1144                 io_array[i].request_type = request_type;
1145                 io_array[i].state = CSTATE_UNCONNECTED;
1146         }
1147
1148         /** Calculate the time at which we should give up */
1149         gettimeofday(&end_tv, NULL);
1150         end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1151
1152         while (1)
1153         {
1154                 int nfds, diff;
1155                 struct timeval tv;
1156                 struct cconn *polled_io_array[g_num_daemons];
1157                 struct pollfd fds[g_num_daemons];
1158                 memset(fds, 0, sizeof(fds));
1159                 nfds = 0;
1160                 for (i = 0; i < g_num_daemons; ++i)
1161                 {
1162                         struct cconn *io = io_array + i;
1163                         ret = cconn_prepare(io, fds + nfds);
1164                         if (ret < 0)
1165                         {
1166                                 WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1167                                                 io->d->name, i, io->state, ret);
1168                                 cconn_close(io);
1169                                 io->request_type = ASOK_REQ_NONE;
1170                                 some_unreachable = 1;
1171                         }
1172                         else if (ret == 1)
1173                         {
1174                                 DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
1175                                                 io->d->name, i, io->state);
1176                                 polled_io_array[nfds++] = io_array + i;
1177                         }
1178                 }
1179                 if (nfds == 0)
1180                 {
1181                         /* finished */
1182                         ret = 0;
1183                         DEBUG("cconn_main_loop: no more cconn to manage.");
1184                         goto done;
1185                 }
1186                 gettimeofday(&tv, NULL);
1187                 diff = milli_diff(&end_tv, &tv);
1188                 if (diff <= 0)
1189                 {
1190                         /* Timed out */
1191                         ret = -ETIMEDOUT;
1192                         WARNING("ERROR: cconn_main_loop: timed out.\n");
1193                         goto done;
1194                 }
1195                 RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1196                 if (ret < 0)
1197                 {
1198                         ERROR("poll(2) error: %d", ret);
1199                         goto done;
1200                 }
1201                 for (i = 0; i < nfds; ++i)
1202                 {
1203                         struct cconn *io = polled_io_array[i];
1204                         int revents = fds[i].revents;
1205                         if (revents == 0)
1206                         {
1207                                 /* do nothing */
1208                         }
1209                         else if (cconn_validate_revents(io, revents))
1210                         {
1211                                 WARNING("ERROR: cconn(name=%s,i=%d,st=%d): "
1212                                 "revents validation error: "
1213                                 "revents=0x%08x", io->d->name, i, io->state, revents);
1214                                 cconn_close(io);
1215                                 io->request_type = ASOK_REQ_NONE;
1216                                 some_unreachable = 1;
1217                         }
1218                         else
1219                         {
1220                                 int ret = cconn_handle_event(io);
1221                                 if (ret)
1222                                 {
1223                                         WARNING("ERROR: cconn_handle_event(name=%s,"
1224                                         "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1225                                         cconn_close(io);
1226                                         io->request_type = ASOK_REQ_NONE;
1227                                         some_unreachable = 1;
1228                                 }
1229                         }
1230                 }
1231         }
1232         done: for (i = 0; i < g_num_daemons; ++i)
1233         {
1234                 cconn_close(io_array + i);
1235         }
1236         if (some_unreachable)
1237         {
1238                 DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
1239         }
1240         else
1241         {
1242                 DEBUG("cconn_main_loop: reached all Ceph daemons :)");
1243         }
1244         return ret;
1245 }
1246
1247 static int ceph_read(void)
1248 {
1249         return cconn_main_loop(ASOK_REQ_DATA);
1250 }
1251
1252 /******* lifecycle *******/
1253 static int ceph_init(void)
1254 {
1255         int i, ret, j;
1256         DEBUG("ceph_init");
1257         ceph_daemons_print();
1258
1259         ret = cconn_main_loop(ASOK_REQ_VERSION);
1260         if (ret)
1261                 return ret;
1262         for (i = 0; i < g_num_daemons; ++i)
1263         {
1264                 struct ceph_daemon *d = g_daemons[i];
1265                 for (j = 0; j < d->dset_num; j++)
1266                 {
1267                         ret = plugin_register_data_set(d->dset + j);
1268                         if (ret)
1269                         {
1270                                 ERROR("plugin_register_data_set(%s) failed!", d->name);
1271                         }
1272                         else
1273                         {
1274                                 DEBUG("plugin_register_data_set(%s): "
1275                                                 "(d->dset)[%d]->ds_num=%d",
1276                                                 d->name, j, d->dset[j].ds_num);
1277                         }
1278                 }
1279         }
1280         return 0;
1281 }
1282
1283 static int ceph_shutdown(void)
1284 {
1285         int i;
1286         for (i = 0; i < g_num_daemons; ++i)
1287         {
1288                 ceph_daemon_free(g_daemons[i]);
1289         }
1290         sfree(g_daemons);
1291         g_daemons = NULL;
1292         g_num_daemons = 0;
1293         for(i = 0; i < last_idx; i++)
1294         {
1295                 sfree(last_poll_data[i]);
1296         }
1297         sfree(last_poll_data);
1298         last_poll_data = NULL;
1299         last_idx = 0;
1300         DEBUG("finished ceph_shutdown");
1301         return 0;
1302 }
1303
1304 void module_register(void)
1305 {
1306         plugin_register_complex_config("ceph", ceph_config);
1307         plugin_register_init("ceph", ceph_init);
1308         plugin_register_read("ceph", ceph_read);
1309         plugin_register_shutdown("ceph", ceph_shutdown);
1310 }