f48e056e68a0c5ca78f3476419732bb057f1ba52
[collectd.git] / src / ceph.c
1 /**
2  * collectd - src/ceph.c
3  * Copyright (C) 2011  New Dream Network
4  * Copyright (C) 2015  Florian octo Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Colin McCabe <cmccabe at alumni.cmu.edu>
21  *   Dennis Zou <yunzou at cisco.com>
22  *   Dan Ryder <daryder at cisco.com>
23  *   Florian octo Forster <octo at collectd.org>
24  **/
25
26 #define _DEFAULT_SOURCE
27 #define _BSD_SOURCE
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32
33 #include <arpa/inet.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <yajl/yajl_parse.h>
37 #if HAVE_YAJL_YAJL_VERSION_H
38 #include <yajl/yajl_version.h>
39 #endif
40
41 #include <limits.h>
42 #include <poll.h>
43 #include <stdint.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47 #include <strings.h>
48 #include <sys/socket.h>
49 #include <sys/time.h>
50 #include <sys/types.h>
51 #include <sys/un.h>
52 #include <unistd.h>
53 #include <math.h>
54 #include <inttypes.h>
55
56 #define RETRY_AVGCOUNT -1
57
58 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
59 # define HAVE_YAJL_V2 1
60 #endif
61
62 #define RETRY_ON_EINTR(ret, expr) \
63     while(1) { \
64         ret = expr; \
65         if(ret >= 0) \
66             break; \
67         ret = -errno; \
68         if(ret != -EINTR) \
69             break; \
70     }
71
72 /** Timeout interval in seconds */
73 #define CEPH_TIMEOUT_INTERVAL 1
74
75 /** Maximum path length for a UNIX domain socket on this system */
76 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
77
78 /** Yajl callback returns */
79 #define CEPH_CB_CONTINUE 1
80 #define CEPH_CB_ABORT 0
81
82 #if HAVE_YAJL_V2
83 typedef size_t yajl_len_t;
84 #else
85 typedef unsigned int yajl_len_t;
86 #endif
87
88 /** Number of types for ceph defined in types.db */
89 #define CEPH_DSET_TYPES_NUM 3
90 /** ceph types enum */
91 enum ceph_dset_type_d
92 {
93     DSET_LATENCY = 0,
94     DSET_BYTES = 1,
95     DSET_RATE = 2,
96     DSET_TYPE_UNFOUND = 1000
97 };
98
99 /** Valid types for ceph defined in types.db */
100 const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] =
101                                    {"ceph_latency", "ceph_bytes", "ceph_rate"};
102
103 /******* ceph_daemon *******/
104 struct ceph_daemon
105 {
106     /** Version of the admin_socket interface */
107     uint32_t version;
108     /** daemon name **/
109     char name[DATA_MAX_NAME_LEN];
110
111     /** Path to the socket that we use to talk to the ceph daemon */
112     char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
113
114     /** Number of counters */
115     int ds_num;
116     /** Track ds types */
117     uint32_t *ds_types;
118     /** Track ds names to match with types */
119     char **ds_names;
120
121     /**
122      * Keep track of last data for latency values so we can calculate rate
123      * since last poll.
124      */
125     struct last_data **last_poll_data;
126     /** index of last poll data */
127     int last_idx;
128 };
129
130 /******* JSON parsing *******/
131 typedef int (*node_handler_t)(void *, const char*, const char*);
132
133 /** Track state and handler while parsing JSON */
134 struct yajl_struct
135 {
136     node_handler_t handler;
137     void * handler_arg;
138     struct {
139       char key[DATA_MAX_NAME_LEN];
140       int key_len;
141     } state[YAJL_MAX_DEPTH];
142     int depth;
143 };
144 typedef struct yajl_struct yajl_struct;
145
146 enum perfcounter_type_d
147 {
148     PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
149 };
150
151 /** Give user option to use default (long run = since daemon started) avg */
152 static int long_run_latency_avg = 0;
153
154 /**
155  * Give user option to use default type for special cases -
156  * filestore.journal_wr_bytes is currently only metric here. Ceph reports the
157  * type as a sum/count pair and will calculate it the same as a latency value.
158  * All other "bytes" metrics (excluding the used/capacity bytes for the OSD)
159  * use the DERIVE type. Unless user specifies to use given type, convert this
160  * metric to use DERIVE.
161  */
162 static int convert_special_metrics = 1;
163
164 /** Array of daemons to monitor */
165 static struct ceph_daemon **g_daemons = NULL;
166
167 /** Number of elements in g_daemons */
168 static int g_num_daemons = 0;
169
170 /**
171  * A set of data that we build up in memory while parsing the JSON.
172  */
173 struct values_tmp
174 {
175     /** ceph daemon we are processing data for*/
176     struct ceph_daemon *d;
177     /** track avgcount across counters for avgcount/sum latency pairs */
178     uint64_t avgcount;
179     /** current index of counters - used to get type of counter */
180     int index;
181     /** do we already have an avgcount for latency pair */
182     int avgcount_exists;
183     /**
184      * similar to index, but current index of latency type counters -
185      * used to get last poll data of counter
186      */
187     int latency_index;
188     /**
189      * values list - maintain across counters since
190      * host/plugin/plugin instance are always the same
191      */
192     value_list_t vlist;
193 };
194
195 /**
196  * A set of count/sum pairs to keep track of latency types and get difference
197  * between this poll data and last poll data.
198  */
199 struct last_data
200 {
201     char ds_name[DATA_MAX_NAME_LEN];
202     double last_sum;
203     uint64_t last_count;
204 };
205
206 /******* network I/O *******/
207 enum cstate_t
208 {
209     CSTATE_UNCONNECTED = 0,
210     CSTATE_WRITE_REQUEST,
211     CSTATE_READ_VERSION,
212     CSTATE_READ_AMT,
213     CSTATE_READ_JSON,
214 };
215
216 enum request_type_t
217 {
218     ASOK_REQ_VERSION = 0,
219     ASOK_REQ_DATA = 1,
220     ASOK_REQ_SCHEMA = 2,
221     ASOK_REQ_NONE = 1000,
222 };
223
224 struct cconn
225 {
226     /** The Ceph daemon that we're talking to */
227     struct ceph_daemon *d;
228
229     /** Request type */
230     uint32_t request_type;
231
232     /** The connection state */
233     enum cstate_t state;
234
235     /** The socket we use to talk to this daemon */
236     int asok;
237
238     /** The amount of data remaining to read / write. */
239     uint32_t amt;
240
241     /** Length of the JSON to read */
242     uint32_t json_len;
243
244     /** Buffer containing JSON data */
245     unsigned char *json;
246
247     /** Keep data important to yajl processing */
248     struct yajl_struct yajl;
249 };
250
251 static int ceph_cb_null(void *ctx)
252 {
253     return CEPH_CB_CONTINUE;
254 }
255
256 static int ceph_cb_boolean(void *ctx, int bool_val)
257 {
258     return CEPH_CB_CONTINUE;
259 }
260
261 static int
262 ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
263 {
264     yajl_struct *yajl = (yajl_struct*)ctx;
265     char buffer[number_len+1];
266     int i, latency_type = 0, result;
267     char key[128];
268
269     memcpy(buffer, number_val, number_len);
270     buffer[sizeof(buffer) - 1] = 0;
271
272     ssnprintf(key, yajl->state[0].key_len, "%s", yajl->state[0].key);
273     for(i = 1; i < yajl->depth; i++)
274     {
275         if((i == yajl->depth-1) && ((strcmp(yajl->state[i].key,"avgcount") == 0)
276                 || (strcmp(yajl->state[i].key,"sum") == 0)))
277         {
278             if(convert_special_metrics)
279             {
280                 /**
281                  * Special case for filestore:JournalWrBytes. For some reason,
282                  * Ceph schema encodes this as a count/sum pair while all
283                  * other "Bytes" data (excluding used/capacity bytes for OSD
284                  * space) uses a single "Derive" type. To spare further
285                  * confusion, keep this KPI as the same type of other "Bytes".
286                  * Instead of keeping an "average" or "rate", use the "sum" in
287                  * the pair and assign that to the derive value.
288                  */
289                 if((strcmp(yajl->state[i-1].key, "journal_wr_bytes") == 0) &&
290                         (strcmp(yajl->state[i-2].key,"filestore") == 0) &&
291                         (strcmp(yajl->state[i].key,"avgcount") == 0))
292                 {
293                     DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
294                     yajl->depth = (yajl->depth - 1);
295                     return CEPH_CB_CONTINUE;
296                 }
297             }
298             //probably a avgcount/sum pair. if not - we'll try full key later
299             latency_type = 1;
300             break;
301         }
302         strncat(key, ".", 1);
303         strncat(key, yajl->state[i].key, yajl->state[i].key_len+1);
304     }
305
306     result = yajl->handler(yajl->handler_arg, buffer, key);
307
308     if((result == RETRY_AVGCOUNT) && latency_type)
309     {
310         strncat(key, ".", 1);
311         strncat(key, yajl->state[yajl->depth-1].key,
312                 yajl->state[yajl->depth-1].key_len+1);
313         result = yajl->handler(yajl->handler_arg, buffer, key);
314     }
315
316     if(result == -ENOMEM)
317     {
318         ERROR("ceph plugin: memory allocation failed");
319         return CEPH_CB_ABORT;
320     }
321
322     yajl->depth = (yajl->depth - 1);
323     return CEPH_CB_CONTINUE;
324 }
325
326 static int ceph_cb_string(void *ctx, const unsigned char *string_val,
327         yajl_len_t string_len)
328 {
329     return CEPH_CB_CONTINUE;
330 }
331
332 static int ceph_cb_start_map(void *ctx)
333 {
334     return CEPH_CB_CONTINUE;
335 }
336
337 static int
338 ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
339 {
340     yajl_struct *yajl = (yajl_struct*)ctx;
341
342     if((yajl->depth+1)  >= YAJL_MAX_DEPTH)
343     {
344         ERROR("ceph plugin: depth exceeds max, aborting.");
345         return CEPH_CB_ABORT;
346     }
347
348     char buffer[string_len+1];
349
350     memcpy(buffer, key, string_len);
351     buffer[sizeof(buffer) - 1] = 0;
352
353     snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer);
354     yajl->state[yajl->depth].key_len = sizeof(buffer);
355     yajl->depth = (yajl->depth + 1);
356
357     return CEPH_CB_CONTINUE;
358 }
359
360 static int ceph_cb_end_map(void *ctx)
361 {
362     yajl_struct *yajl = (yajl_struct*)ctx;
363
364     yajl->depth = (yajl->depth - 1);
365     return CEPH_CB_CONTINUE;
366 }
367
368 static int ceph_cb_start_array(void *ctx)
369 {
370     return CEPH_CB_CONTINUE;
371 }
372
373 static int ceph_cb_end_array(void *ctx)
374 {
375     return CEPH_CB_CONTINUE;
376 }
377
378 static yajl_callbacks callbacks = {
379         ceph_cb_null,
380         ceph_cb_boolean,
381         NULL,
382         NULL,
383         ceph_cb_number,
384         ceph_cb_string,
385         ceph_cb_start_map,
386         ceph_cb_map_key,
387         ceph_cb_end_map,
388         ceph_cb_start_array,
389         ceph_cb_end_array
390 };
391
392 static void ceph_daemon_print(const struct ceph_daemon *d)
393 {
394     DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path);
395 }
396
397 static void ceph_daemons_print(void)
398 {
399     int i;
400     for(i = 0; i < g_num_daemons; ++i)
401     {
402         ceph_daemon_print(g_daemons[i]);
403     }
404 }
405
406 static void ceph_daemon_free(struct ceph_daemon *d)
407 {
408     int i = 0;
409     for(; i < d->last_idx; i++)
410     {
411         sfree(d->last_poll_data[i]);
412     }
413     sfree(d->last_poll_data);
414     d->last_poll_data = NULL;
415     d->last_idx = 0;
416     for(i = 0; i < d->ds_num; i++)
417     {
418         sfree(d->ds_names[i]);
419     }
420     sfree(d->ds_types);
421     sfree(d->ds_names);
422     sfree(d);
423 }
424
425 /* compact_ds_name removed the special characters ":", "_", "-" and "+" from the
426  * intput string. Characters following these special characters are capitalized.
427  * Trailing "+" and "-" characters are replaces with the strings "Plus" and
428  * "Minus". */
429 static int compact_ds_name (char *buffer, size_t buffer_size, char const *src)
430 {
431     char *src_copy;
432     size_t src_len;
433     char *ptr = buffer;
434     size_t ptr_size = buffer_size;
435     _Bool append_plus = 0;
436     _Bool append_minus = 0;
437
438     if ((buffer == NULL) || (buffer_size <= strlen ("Minus")) || (src == NULL))
439       return EINVAL;
440
441     src_copy = strdup (src);
442     src_len = strlen(src);
443
444     /* Remove trailing "+" and "-". */
445     if (src_copy[src_len - 1] == '+')
446     {
447         append_plus = 1;
448         src_len--;
449         src_copy[src_len] = 0;
450     }
451     else if (src_copy[src_len - 1] == '-')
452     {
453         append_minus = 1;
454         src_len--;
455         src_copy[src_len] = 0;
456     }
457
458     /* Split at special chars, capitalize first character, append to buffer. */
459     char *dummy = src_copy;
460     char *token;
461     char *save_ptr = NULL;
462     while ((token = strtok_r (dummy, ":_-+", &save_ptr)) != NULL)
463     {
464         size_t len;
465
466         dummy = NULL;
467
468         token[0] = toupper ((int) token[0]);
469
470         assert (ptr_size > 1);
471
472         len = strlen (token);
473         if (len >= ptr_size)
474             len = ptr_size - 1;
475
476         assert (len > 0);
477         assert (len < ptr_size);
478
479         sstrncpy (ptr, token, len + 1);
480         ptr += len;
481         ptr_size -= len;
482
483         assert (*ptr == 0);
484         if (ptr_size <= 1)
485             break;
486     }
487
488     /* Append "Plus" or "Minus" if "+" or "-" has been stripped above. */
489     if (append_plus || append_minus)
490     {
491         char const *append = "Plus";
492         if (append_minus)
493             append = "Minus";
494
495         size_t offset = buffer_size - (strlen (append) + 1);
496         if (offset > strlen (buffer))
497             offset = strlen (buffer);
498
499         sstrncpy (buffer + offset, append, buffer_size - offset);
500     }
501
502     sfree (src_copy);
503     return 0;
504 }
505
506 static _Bool has_suffix (char const *str, char const *suffix)
507 {
508     size_t str_len = strlen (str);
509     size_t suffix_len = strlen (suffix);
510     size_t offset;
511
512     if (suffix_len > str_len)
513         return 0;
514     offset = str_len - suffix_len;
515
516     if (strcmp (str + offset, suffix) == 0)
517         return 1;
518
519     return 0;
520 }
521
522 /* count_parts returns the number of elements a "foo.bar.baz" style key has. */
523 static size_t count_parts (char const *key)
524 {
525     char const *ptr;
526     size_t parts_num = 0;
527
528     for (ptr = key; ptr != NULL; ptr = strchr (ptr + 1, '.'))
529         parts_num++;
530
531     return parts_num;
532 }
533
534 /**
535  * Parse key to remove "type" if this is for schema and initiate compaction
536  */
537 static int parse_keys (char *buffer, size_t buffer_size, const char *key_str)
538 {
539     char tmp[2 * buffer_size];
540
541     if (buffer == NULL || buffer_size == 0 || key_str == NULL || strlen (key_str) == 0)
542         return EINVAL;
543
544     if ((count_parts (key_str) > 2) && has_suffix (key_str, ".type"))
545     {
546         /* strip ".type" suffix iff the key has more than two parts. */
547         size_t sz = strlen (key_str) - strlen (".type") + 1;
548
549         if (sz > sizeof (tmp))
550             sz = sizeof (tmp);
551         sstrncpy (tmp, key_str, sz);
552     }
553     else
554     {
555         sstrncpy (tmp, key_str, sizeof (tmp));
556     }
557
558     return compact_ds_name (buffer, buffer_size, tmp);
559 }
560
561 /**
562  * while parsing ceph admin socket schema, save counter name and type for later
563  * data processing
564  */
565 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
566         int pc_type)
567 {
568     uint32_t type;
569     char ds_name[DATA_MAX_NAME_LEN];
570     memset(ds_name, 0, sizeof(ds_name));
571
572     if(convert_special_metrics)
573     {
574         /**
575          * Special case for filestore:JournalWrBytes. For some reason, Ceph
576          * schema encodes this as a count/sum pair while all other "Bytes" data
577          * (excluding used/capacity bytes for OSD space) uses a single "Derive"
578          * type. To spare further confusion, keep this KPI as the same type of
579          * other "Bytes". Instead of keeping an "average" or "rate", use the
580          * "sum" in the pair and assign that to the derive value.
581          */
582         if((strcmp(name,"filestore.journal_wr_bytes.type") == 0))
583         {
584             pc_type = 10;
585         }
586     }
587
588     d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1));
589     if(!d->ds_names)
590     {
591         return -ENOMEM;
592     }
593
594     d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1));
595     if(!d->ds_types)
596     {
597         return -ENOMEM;
598     }
599
600     d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN);
601     if(!d->ds_names[d->ds_num])
602     {
603         return -ENOMEM;
604     }
605
606     type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE :
607             ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
608     d->ds_types[d->ds_num] = type;
609
610     if (parse_keys(ds_name, sizeof (ds_name), name))
611     {
612         return 1;
613     }
614
615     sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
616     d->ds_num = (d->ds_num + 1);
617
618     return 0;
619 }
620
621 /******* ceph_config *******/
622 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
623 {
624     const char *val;
625     if(item->values_num != 1)
626     {
627         return -ENOTSUP;
628     }
629     if(item->values[0].type != OCONFIG_TYPE_STRING)
630     {
631         return -ENOTSUP;
632     }
633     val = item->values[0].value.string;
634     if(snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
635     {
636         ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
637                 item->key);
638         return -ENAMETOOLONG;
639     }
640     return 0;
641 }
642
643 static int cc_handle_bool(struct oconfig_item_s *item, int *dest)
644 {
645     if(item->values_num != 1)
646     {
647         return -ENOTSUP;
648     }
649
650     if(item->values[0].type != OCONFIG_TYPE_BOOLEAN)
651     {
652         return -ENOTSUP;
653     }
654
655     *dest = (item->values[0].value.boolean) ? 1 : 0;
656     return 0;
657 }
658
659 static int cc_add_daemon_config(oconfig_item_t *ci)
660 {
661     int ret, i;
662     struct ceph_daemon *nd, cd;
663     struct ceph_daemon **tmp;
664     memset(&cd, 0, sizeof(struct ceph_daemon));
665
666     if((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
667     {
668         WARNING("ceph plugin: `Daemon' blocks need exactly one string "
669                 "argument.");
670         return (-1);
671     }
672
673     ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
674     if(ret)
675     {
676         return ret;
677     }
678
679     for(i=0; i < ci->children_num; i++)
680     {
681         oconfig_item_t *child = ci->children + i;
682
683         if(strcasecmp("SocketPath", child->key) == 0)
684         {
685             ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
686             if(ret)
687             {
688                 return ret;
689             }
690         }
691         else
692         {
693             WARNING("ceph plugin: ignoring unknown option %s", child->key);
694         }
695     }
696     if(cd.name[0] == '\0')
697     {
698         ERROR("ceph plugin: you must configure a daemon name.\n");
699         return -EINVAL;
700     }
701     else if(cd.asok_path[0] == '\0')
702     {
703         ERROR("ceph plugin(name=%s): you must configure an administrative "
704         "socket path.\n", cd.name);
705         return -EINVAL;
706     }
707     else if(!((cd.asok_path[0] == '/') ||
708             (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
709     {
710         ERROR("ceph plugin(name=%s): administrative socket paths must begin "
711                 "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
712         return -EINVAL;
713     }
714
715     tmp = realloc(g_daemons, (g_num_daemons+1) * sizeof(*g_daemons));
716     if(tmp == NULL)
717     {
718         /* The positive return value here indicates that this is a
719          * runtime error, not a configuration error.  */
720         return ENOMEM;
721     }
722     g_daemons = tmp;
723
724     nd = malloc(sizeof(*nd));
725     if(!nd)
726     {
727         return ENOMEM;
728     }
729     memcpy(nd, &cd, sizeof(*nd));
730     g_daemons[g_num_daemons++] = nd;
731     return 0;
732 }
733
734 static int ceph_config(oconfig_item_t *ci)
735 {
736     int ret, i;
737
738     for(i = 0; i < ci->children_num; ++i)
739     {
740         oconfig_item_t *child = ci->children + i;
741         if(strcasecmp("Daemon", child->key) == 0)
742         {
743             ret = cc_add_daemon_config(child);
744             if(ret == ENOMEM)
745             {
746                 ERROR("ceph plugin: Couldn't allocate memory");
747                 return ret;
748             }
749             else if(ret)
750             {
751                 //process other daemons and ignore this one
752                 continue;
753             }
754         }
755         else if(strcasecmp("LongRunAvgLatency", child->key) == 0)
756         {
757             ret = cc_handle_bool(child, &long_run_latency_avg);
758             if(ret)
759             {
760                 return ret;
761             }
762         }
763         else if(strcasecmp("ConvertSpecialMetricTypes", child->key) == 0)
764         {
765             ret = cc_handle_bool(child, &convert_special_metrics);
766             if(ret)
767             {
768                 return ret;
769             }
770         }
771         else
772         {
773             WARNING("ceph plugin: ignoring unknown option %s", child->key);
774         }
775     }
776     return 0;
777 }
778
779 /**
780  * Parse JSON and get error message if present
781  */
782 static int
783 traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
784 {
785     yajl_status status = yajl_parse(hand, json, json_len);
786     unsigned char *msg;
787
788     switch(status)
789     {
790         case yajl_status_error:
791             msg = yajl_get_error(hand, /* verbose = */ 1,
792                                        /* jsonText = */ (unsigned char *) json,
793                                                       (unsigned int) json_len);
794             ERROR ("ceph plugin: yajl_parse failed: %s", msg);
795             yajl_free_error(hand, msg);
796             return 1;
797         case yajl_status_client_canceled:
798             return 1;
799         default:
800             return 0;
801     }
802 }
803
804 /**
805  * Add entry for each counter while parsing schema
806  */
807 static int
808 node_handler_define_schema(void *arg, const char *val, const char *key)
809 {
810     struct ceph_daemon *d = (struct ceph_daemon *) arg;
811     int pc_type;
812     pc_type = atoi(val);
813     return ceph_daemon_add_ds_entry(d, key, pc_type);
814 }
815
816 /**
817  * Latency counter does not yet have an entry in last poll data - add it.
818  */
819 static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum,
820         uint64_t cur_count)
821 {
822     d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data));
823     if(!d->last_poll_data[d->last_idx])
824     {
825         return -ENOMEM;
826     }
827     sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n,
828             sizeof(d->last_poll_data[d->last_idx]->ds_name));
829     d->last_poll_data[d->last_idx]->last_sum = cur_sum;
830     d->last_poll_data[d->last_idx]->last_count = cur_count;
831     d->last_idx = (d->last_idx + 1);
832     return 0;
833 }
834
835 /**
836  * Update latency counter or add new entry if it doesn't exist
837  */
838 static int update_last(struct ceph_daemon *d, const char *ds_n, int index,
839         double cur_sum, uint64_t cur_count)
840 {
841     if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0))
842     {
843         d->last_poll_data[index]->last_sum = cur_sum;
844         d->last_poll_data[index]->last_count = cur_count;
845         return 0;
846     }
847
848     if(!d->last_poll_data)
849     {
850         d->last_poll_data = malloc(1 * sizeof(struct last_data *));
851         if(!d->last_poll_data)
852         {
853             return -ENOMEM;
854         }
855     }
856     else
857     {
858         struct last_data **tmp_last = realloc(d->last_poll_data,
859                 ((d->last_idx+1) * sizeof(struct last_data *)));
860         if(!tmp_last)
861         {
862             return -ENOMEM;
863         }
864         d->last_poll_data = tmp_last;
865     }
866     return add_last(d, ds_n, cur_sum, cur_count);
867 }
868
869 /**
870  * If using index guess failed (shouldn't happen, but possible if counters
871  * get rearranged), resort to searching for counter name
872  */
873 static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n)
874 {
875     int i = 0;
876     for(; i < d->last_idx; i++)
877     {
878         if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0)
879         {
880             return i;
881         }
882     }
883     return -1;
884 }
885
886 /**
887  * Calculate average b/t current data and last poll data
888  * if last poll data exists
889  */
890 static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index,
891         double cur_sum, uint64_t cur_count)
892 {
893     double result = -1.1, sum_delt = 0.0;
894     uint64_t count_delt = 0;
895     int tmp_index = 0;
896     if(d->last_idx > index)
897     {
898         if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)
899         {
900             tmp_index = index;
901         }
902         //test previous index
903         else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0))
904         {
905             tmp_index = (index - 1);
906         }
907         else
908         {
909             tmp_index = backup_search_for_last_avg(d, ds_n);
910         }
911
912         if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count))
913         {
914             sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum);
915             count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count);
916             result = (sum_delt / count_delt);
917         }
918     }
919
920     if(result == -1.1)
921     {
922         result = NAN;
923     }
924     if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM)
925     {
926         return -ENOMEM;
927     }
928     return result;
929 }
930
931 /**
932  * If using index guess failed, resort to searching for counter name
933  */
934 static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name)
935 {
936     int idx = 0;
937     for(; idx < d->ds_num; idx++)
938     {
939         if(strcmp(d->ds_names[idx], ds_name) == 0)
940         {
941             return d->ds_types[idx];
942         }
943     }
944     return DSET_TYPE_UNFOUND;
945 }
946
947 /**
948  * Process counter data and dispatch values
949  */
950 static int node_handler_fetch_data(void *arg, const char *val, const char *key)
951 {
952     value_t uv;
953     double tmp_d;
954     uint64_t tmp_u;
955     struct values_tmp *vtmp = (struct values_tmp*) arg;
956     uint32_t type = DSET_TYPE_UNFOUND;
957     int index = vtmp->index;
958
959     char ds_name[DATA_MAX_NAME_LEN];
960     memset(ds_name, 0, sizeof(ds_name));
961
962     if (parse_keys (ds_name, sizeof (ds_name), key))
963     {
964         return 1;
965     }
966
967     if(index >= vtmp->d->ds_num)
968     {
969         //don't overflow bounds of array
970         index = (vtmp->d->ds_num - 1);
971     }
972
973     /**
974      * counters should remain in same order we parsed schema... we maintain the
975      * index variable to keep track of current point in list of counters. first
976      * use index to guess point in array for retrieving type. if that doesn't
977      * work, use the old way to get the counter type
978      */
979     if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0)
980     {
981         //found match
982         type = vtmp->d->ds_types[index];
983     }
984     else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0))
985     {
986         //try previous key
987         type = vtmp->d->ds_types[index-1];
988     }
989
990     if(type == DSET_TYPE_UNFOUND)
991     {
992         //couldn't find right type by guessing, check the old way
993         type = backup_search_for_type(vtmp->d, ds_name);
994     }
995
996     switch(type)
997     {
998         case DSET_LATENCY:
999             if(vtmp->avgcount_exists == -1)
1000             {
1001                 sscanf(val, "%" PRIu64, &vtmp->avgcount);
1002                 vtmp->avgcount_exists = 0;
1003                 //return after saving avgcount - don't dispatch value
1004                 //until latency calculation
1005                 return 0;
1006             }
1007             else
1008             {
1009                 double sum, result;
1010                 sscanf(val, "%lf", &sum);
1011
1012                 if(vtmp->avgcount == 0)
1013                 {
1014                     vtmp->avgcount = 1;
1015                 }
1016
1017                 /** User wants latency values as long run avg */
1018                 if(long_run_latency_avg)
1019                 {
1020                     result = (sum / vtmp->avgcount);
1021                 }
1022                 else
1023                 {
1024                     result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount);
1025                     if(result == -ENOMEM)
1026                     {
1027                         return -ENOMEM;
1028                     }
1029                 }
1030
1031                 uv.gauge = result;
1032                 vtmp->avgcount_exists = -1;
1033                 vtmp->latency_index = (vtmp->latency_index + 1);
1034             }
1035             break;
1036         case DSET_BYTES:
1037             sscanf(val, "%lf", &tmp_d);
1038             uv.gauge = tmp_d;
1039             break;
1040         case DSET_RATE:
1041             sscanf(val, "%" PRIu64, &tmp_u);
1042             uv.derive = tmp_u;
1043             break;
1044         case DSET_TYPE_UNFOUND:
1045         default:
1046             ERROR("ceph plugin: ds %s was not properly initialized.", ds_name);
1047             return -1;
1048     }
1049
1050     sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type));
1051     sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance));
1052     vtmp->vlist.values = &uv;
1053     vtmp->vlist.values_len = 1;
1054
1055     vtmp->index = (vtmp->index + 1);
1056     plugin_dispatch_values(&vtmp->vlist);
1057
1058     return 0;
1059 }
1060
1061 static int cconn_connect(struct cconn *io)
1062 {
1063     struct sockaddr_un address;
1064     int flags, fd, err;
1065     if(io->state != CSTATE_UNCONNECTED)
1066     {
1067         ERROR("ceph plugin: cconn_connect: io->state != CSTATE_UNCONNECTED");
1068         return -EDOM;
1069     }
1070     fd = socket(PF_UNIX, SOCK_STREAM, 0);
1071     if(fd < 0)
1072     {
1073         int err = -errno;
1074         ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) "
1075             "failed: error %d", err);
1076         return err;
1077     }
1078     memset(&address, 0, sizeof(struct sockaddr_un));
1079     address.sun_family = AF_UNIX;
1080     snprintf(address.sun_path, sizeof(address.sun_path), "%s",
1081             io->d->asok_path);
1082     RETRY_ON_EINTR(err,
1083         connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
1084     if(err < 0)
1085     {
1086         ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d",
1087             fd, err);
1088         return err;
1089     }
1090
1091     flags = fcntl(fd, F_GETFL, 0);
1092     if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
1093     {
1094         err = -errno;
1095         ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d",
1096             fd, err);
1097         return err;
1098     }
1099     io->asok = fd;
1100     io->state = CSTATE_WRITE_REQUEST;
1101     io->amt = 0;
1102     io->json_len = 0;
1103     io->json = NULL;
1104     return 0;
1105 }
1106
1107 static void cconn_close(struct cconn *io)
1108 {
1109     io->state = CSTATE_UNCONNECTED;
1110     if(io->asok != -1)
1111     {
1112         int res;
1113         RETRY_ON_EINTR(res, close(io->asok));
1114     }
1115     io->asok = -1;
1116     io->amt = 0;
1117     io->json_len = 0;
1118     sfree(io->json);
1119     io->json = NULL;
1120 }
1121
1122 /* Process incoming JSON counter data */
1123 static int
1124 cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand)
1125 {
1126     int ret;
1127     struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
1128     if(!vtmp)
1129     {
1130         return -ENOMEM;
1131     }
1132
1133     vtmp->vlist = (value_list_t)VALUE_LIST_INIT;
1134     sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host));
1135     sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin));
1136     sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance));
1137
1138     vtmp->d = io->d;
1139     vtmp->avgcount_exists = -1;
1140     vtmp->latency_index = 0;
1141     vtmp->index = 0;
1142     yajl->handler_arg = vtmp;
1143     ret = traverse_json(io->json, io->json_len, hand);
1144     sfree(vtmp);
1145     return ret;
1146 }
1147
1148 /**
1149  * Initiate JSON parsing and print error if one occurs
1150  */
1151 static int cconn_process_json(struct cconn *io)
1152 {
1153     if((io->request_type != ASOK_REQ_DATA) &&
1154             (io->request_type != ASOK_REQ_SCHEMA))
1155     {
1156         return -EDOM;
1157     }
1158
1159     int result = 1;
1160     yajl_handle hand;
1161     yajl_status status;
1162
1163     hand = yajl_alloc(&callbacks,
1164 #if HAVE_YAJL_V2
1165       /* alloc funcs = */ NULL,
1166 #else
1167       /* alloc funcs = */ NULL, NULL,
1168 #endif
1169       /* context = */ (void *)(&io->yajl));
1170
1171     if(!hand)
1172     {
1173         ERROR ("ceph plugin: yajl_alloc failed.");
1174         return ENOMEM;
1175     }
1176
1177     io->yajl.depth = 0;
1178
1179     switch(io->request_type)
1180     {
1181         case ASOK_REQ_DATA:
1182             io->yajl.handler = node_handler_fetch_data;
1183             result = cconn_process_data(io, &io->yajl, hand);
1184             break;
1185         case ASOK_REQ_SCHEMA:
1186             //init daemon specific variables
1187             io->d->ds_num = 0;
1188             io->d->last_idx = 0;
1189             io->d->last_poll_data = NULL;
1190             io->yajl.handler = node_handler_define_schema;
1191             io->yajl.handler_arg = io->d;
1192             result = traverse_json(io->json, io->json_len, hand);
1193             break;
1194     }
1195
1196     if(result)
1197     {
1198         goto done;
1199     }
1200
1201 #if HAVE_YAJL_V2
1202     status = yajl_complete_parse(hand);
1203 #else
1204     status = yajl_parse_complete(hand);
1205 #endif
1206
1207     if (status != yajl_status_ok)
1208     {
1209       unsigned char *errmsg = yajl_get_error (hand, /* verbose = */ 0,
1210           /* jsonText = */ NULL, /* jsonTextLen = */ 0);
1211       ERROR ("ceph plugin: yajl_parse_complete failed: %s",
1212           (char *) errmsg);
1213       yajl_free_error (hand, errmsg);
1214       yajl_free (hand);
1215       return 1;
1216     }
1217
1218     done:
1219     yajl_free (hand);
1220     return result;
1221 }
1222
1223 static int cconn_validate_revents(struct cconn *io, int revents)
1224 {
1225     if(revents & POLLERR)
1226     {
1227         ERROR("ceph plugin: cconn_validate_revents(name=%s): got POLLERR",
1228             io->d->name);
1229         return -EIO;
1230     }
1231     switch (io->state)
1232     {
1233         case CSTATE_WRITE_REQUEST:
1234             return (revents & POLLOUT) ? 0 : -EINVAL;
1235         case CSTATE_READ_VERSION:
1236         case CSTATE_READ_AMT:
1237         case CSTATE_READ_JSON:
1238             return (revents & POLLIN) ? 0 : -EINVAL;
1239         default:
1240             ERROR("ceph plugin: cconn_validate_revents(name=%s) got to "
1241                 "illegal state on line %d", io->d->name, __LINE__);
1242             return -EDOM;
1243     }
1244 }
1245
1246 /** Handle a network event for a connection */
1247 static int cconn_handle_event(struct cconn *io)
1248 {
1249     int ret;
1250     switch (io->state)
1251     {
1252         case CSTATE_UNCONNECTED:
1253             ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
1254                 "state on line %d", io->d->name, __LINE__);
1255
1256             return -EDOM;
1257         case CSTATE_WRITE_REQUEST:
1258         {
1259             char cmd[32];
1260             snprintf(cmd, sizeof(cmd), "%s%d%s", "{ \"prefix\": \"",
1261                     io->request_type, "\" }\n");
1262             size_t cmd_len = strlen(cmd);
1263             RETRY_ON_EINTR(ret,
1264                   write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
1265             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
1266                     io->d->name, io->state, io->amt, ret);
1267             if(ret < 0)
1268             {
1269                 return ret;
1270             }
1271             io->amt += ret;
1272             if(io->amt >= cmd_len)
1273             {
1274                 io->amt = 0;
1275                 switch (io->request_type)
1276                 {
1277                     case ASOK_REQ_VERSION:
1278                         io->state = CSTATE_READ_VERSION;
1279                         break;
1280                     default:
1281                         io->state = CSTATE_READ_AMT;
1282                         break;
1283                 }
1284             }
1285             return 0;
1286         }
1287         case CSTATE_READ_VERSION:
1288         {
1289             RETRY_ON_EINTR(ret,
1290                     read(io->asok, ((char*)(&io->d->version)) + io->amt,
1291                             sizeof(io->d->version) - io->amt));
1292             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1293                     io->d->name, io->state, ret);
1294             if(ret < 0)
1295             {
1296                 return ret;
1297             }
1298             io->amt += ret;
1299             if(io->amt >= sizeof(io->d->version))
1300             {
1301                 io->d->version = ntohl(io->d->version);
1302                 if(io->d->version != 1)
1303                 {
1304                     ERROR("ceph plugin: cconn_handle_event(name=%s) not "
1305                         "expecting version %d!", io->d->name, io->d->version);
1306                     return -ENOTSUP;
1307                 }
1308                 DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as "
1309                         "version %d", io->d->name, io->d->version);
1310                 io->amt = 0;
1311                 cconn_close(io);
1312                 io->request_type = ASOK_REQ_SCHEMA;
1313             }
1314             return 0;
1315         }
1316         case CSTATE_READ_AMT:
1317         {
1318             RETRY_ON_EINTR(ret,
1319                     read(io->asok, ((char*)(&io->json_len)) + io->amt,
1320                             sizeof(io->json_len) - io->amt));
1321             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1322                     io->d->name, io->state, ret);
1323             if(ret < 0)
1324             {
1325                 return ret;
1326             }
1327             io->amt += ret;
1328             if(io->amt >= sizeof(io->json_len))
1329             {
1330                 io->json_len = ntohl(io->json_len);
1331                 io->amt = 0;
1332                 io->state = CSTATE_READ_JSON;
1333                 io->json = calloc(1, io->json_len + 1);
1334                 if(!io->json)
1335                 {
1336                     ERROR("ceph plugin: error callocing io->json");
1337                     return -ENOMEM;
1338                 }
1339             }
1340             return 0;
1341         }
1342         case CSTATE_READ_JSON:
1343         {
1344             RETRY_ON_EINTR(ret,
1345                    read(io->asok, io->json + io->amt, io->json_len - io->amt));
1346             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1347                     io->d->name, io->state, ret);
1348             if(ret < 0)
1349             {
1350                 return ret;
1351             }
1352             io->amt += ret;
1353             if(io->amt >= io->json_len)
1354             {
1355                 ret = cconn_process_json(io);
1356                 if(ret)
1357                 {
1358                     return ret;
1359                 }
1360                 cconn_close(io);
1361                 io->request_type = ASOK_REQ_NONE;
1362             }
1363             return 0;
1364         }
1365         default:
1366             ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
1367                 "state on line %d", io->d->name, __LINE__);
1368             return -EDOM;
1369     }
1370 }
1371
1372 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
1373 {
1374     int ret;
1375     if(io->request_type == ASOK_REQ_NONE)
1376     {
1377         /* The request has already been serviced. */
1378         return 0;
1379     }
1380     else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0))
1381     {
1382         /* If there are no counters to report on, don't bother
1383          * connecting */
1384         return 0;
1385     }
1386
1387     switch (io->state)
1388     {
1389         case CSTATE_UNCONNECTED:
1390             ret = cconn_connect(io);
1391             if(ret > 0)
1392             {
1393                 return -ret;
1394             }
1395             else if(ret < 0)
1396             {
1397                 return ret;
1398             }
1399             fds->fd = io->asok;
1400             fds->events = POLLOUT;
1401             return 1;
1402         case CSTATE_WRITE_REQUEST:
1403             fds->fd = io->asok;
1404             fds->events = POLLOUT;
1405             return 1;
1406         case CSTATE_READ_VERSION:
1407         case CSTATE_READ_AMT:
1408         case CSTATE_READ_JSON:
1409             fds->fd = io->asok;
1410             fds->events = POLLIN;
1411             return 1;
1412         default:
1413             ERROR("ceph plugin: cconn_prepare(name=%s) got to illegal state "
1414                 "on line %d", io->d->name, __LINE__);
1415             return -EDOM;
1416     }
1417 }
1418
1419 /** Returns the difference between two struct timevals in milliseconds.
1420  * On overflow, we return max/min int.
1421  */
1422 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
1423 {
1424     int64_t ret;
1425     int sec_diff = t1->tv_sec - t2->tv_sec;
1426     int usec_diff = t1->tv_usec - t2->tv_usec;
1427     ret = usec_diff / 1000;
1428     ret += (sec_diff * 1000);
1429     return (ret > INT_MAX) ? INT_MAX : ((ret < INT_MIN) ? INT_MIN : (int)ret);
1430 }
1431
1432 /** This handles the actual network I/O to talk to the Ceph daemons.
1433  */
1434 static int cconn_main_loop(uint32_t request_type)
1435 {
1436     int i, ret, some_unreachable = 0;
1437     struct timeval end_tv;
1438     struct cconn io_array[g_num_daemons];
1439
1440     DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type);
1441
1442     /* create cconn array */
1443     memset(io_array, 0, sizeof(io_array));
1444     for(i = 0; i < g_num_daemons; ++i)
1445     {
1446         io_array[i].d = g_daemons[i];
1447         io_array[i].request_type = request_type;
1448         io_array[i].state = CSTATE_UNCONNECTED;
1449     }
1450
1451     /** Calculate the time at which we should give up */
1452     gettimeofday(&end_tv, NULL);
1453     end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1454
1455     while (1)
1456     {
1457         int nfds, diff;
1458         struct timeval tv;
1459         struct cconn *polled_io_array[g_num_daemons];
1460         struct pollfd fds[g_num_daemons];
1461         memset(fds, 0, sizeof(fds));
1462         nfds = 0;
1463         for(i = 0; i < g_num_daemons; ++i)
1464         {
1465             struct cconn *io = io_array + i;
1466             ret = cconn_prepare(io, fds + nfds);
1467             if(ret < 0)
1468             {
1469                 WARNING("ceph plugin: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1470                         io->d->name, i, io->state, ret);
1471                 cconn_close(io);
1472                 io->request_type = ASOK_REQ_NONE;
1473                 some_unreachable = 1;
1474             }
1475             else if(ret == 1)
1476             {
1477                 polled_io_array[nfds++] = io_array + i;
1478             }
1479         }
1480         if(nfds == 0)
1481         {
1482             /* finished */
1483             ret = 0;
1484             goto done;
1485         }
1486         gettimeofday(&tv, NULL);
1487         diff = milli_diff(&end_tv, &tv);
1488         if(diff <= 0)
1489         {
1490             /* Timed out */
1491             ret = -ETIMEDOUT;
1492             WARNING("ceph plugin: cconn_main_loop: timed out.");
1493             goto done;
1494         }
1495         RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1496         if(ret < 0)
1497         {
1498             ERROR("ceph plugin: poll(2) error: %d", ret);
1499             goto done;
1500         }
1501         for(i = 0; i < nfds; ++i)
1502         {
1503             struct cconn *io = polled_io_array[i];
1504             int revents = fds[i].revents;
1505             if(revents == 0)
1506             {
1507                 /* do nothing */
1508             }
1509             else if(cconn_validate_revents(io, revents))
1510             {
1511                 WARNING("ceph plugin: cconn(name=%s,i=%d,st=%d): "
1512                 "revents validation error: "
1513                 "revents=0x%08x", io->d->name, i, io->state, revents);
1514                 cconn_close(io);
1515                 io->request_type = ASOK_REQ_NONE;
1516                 some_unreachable = 1;
1517             }
1518             else
1519             {
1520                 int ret = cconn_handle_event(io);
1521                 if(ret)
1522                 {
1523                     WARNING("ceph plugin: cconn_handle_event(name=%s,"
1524                     "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1525                     cconn_close(io);
1526                     io->request_type = ASOK_REQ_NONE;
1527                     some_unreachable = 1;
1528                 }
1529             }
1530         }
1531     }
1532     done: for(i = 0; i < g_num_daemons; ++i)
1533     {
1534         cconn_close(io_array + i);
1535     }
1536     if(some_unreachable)
1537     {
1538         DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable.");
1539     }
1540     else
1541     {
1542         DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)");
1543     }
1544     return ret;
1545 }
1546
1547 static int ceph_read(void)
1548 {
1549     return cconn_main_loop(ASOK_REQ_DATA);
1550 }
1551
1552 /******* lifecycle *******/
1553 static int ceph_init(void)
1554 {
1555     int ret;
1556     ceph_daemons_print();
1557
1558     ret = cconn_main_loop(ASOK_REQ_VERSION);
1559
1560     return (ret) ? ret : 0;
1561 }
1562
1563 static int ceph_shutdown(void)
1564 {
1565     int i;
1566     for(i = 0; i < g_num_daemons; ++i)
1567     {
1568         ceph_daemon_free(g_daemons[i]);
1569     }
1570     sfree(g_daemons);
1571     g_daemons = NULL;
1572     g_num_daemons = 0;
1573     DEBUG("ceph plugin: finished ceph_shutdown");
1574     return 0;
1575 }
1576
1577 void module_register(void)
1578 {
1579     plugin_register_complex_config("ceph", ceph_config);
1580     plugin_register_init("ceph", ceph_init);
1581     plugin_register_read("ceph", ceph_read);
1582     plugin_register_shutdown("ceph", ceph_shutdown);
1583 }