Merge branch 'collectd-5.4' into collectd-5.5
[collectd.git] / src / ceph.c
1 /**
2  * collectd - src/ceph.c
3  * Copyright (C) 2011  New Dream Network
4  * Copyright (C) 2015  Florian octo Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Colin McCabe <cmccabe at alumni.cmu.edu>
21  *   Dennis Zou <yunzou at cisco.com>
22  *   Dan Ryder <daryder at cisco.com>
23  *   Florian octo Forster <octo at collectd.org>
24  **/
25
26 #define _DEFAULT_SOURCE
27 #define _BSD_SOURCE
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32
33 #include <arpa/inet.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <yajl/yajl_parse.h>
37 #if HAVE_YAJL_YAJL_VERSION_H
38 #include <yajl/yajl_version.h>
39 #endif
40
41 #include <limits.h>
42 #include <poll.h>
43 #include <stdint.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47 #include <strings.h>
48 #include <sys/socket.h>
49 #include <sys/time.h>
50 #include <sys/types.h>
51 #include <sys/un.h>
52 #include <unistd.h>
53 #include <math.h>
54 #include <inttypes.h>
55
56 #define RETRY_AVGCOUNT -1
57
58 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
59 # define HAVE_YAJL_V2 1
60 #endif
61
62 #define RETRY_ON_EINTR(ret, expr) \
63     while(1) { \
64         ret = expr; \
65         if(ret >= 0) \
66             break; \
67         ret = -errno; \
68         if(ret != -EINTR) \
69             break; \
70     }
71
72 /** Timeout interval in seconds */
73 #define CEPH_TIMEOUT_INTERVAL 1
74
75 /** Maximum path length for a UNIX domain socket on this system */
76 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
77
78 /** Yajl callback returns */
79 #define CEPH_CB_CONTINUE 1
80 #define CEPH_CB_ABORT 0
81
82 #if HAVE_YAJL_V2
83 typedef size_t yajl_len_t;
84 #else
85 typedef unsigned int yajl_len_t;
86 #endif
87
88 /** Number of types for ceph defined in types.db */
89 #define CEPH_DSET_TYPES_NUM 3
90 /** ceph types enum */
91 enum ceph_dset_type_d
92 {
93     DSET_LATENCY = 0,
94     DSET_BYTES = 1,
95     DSET_RATE = 2,
96     DSET_TYPE_UNFOUND = 1000
97 };
98
99 /** Valid types for ceph defined in types.db */
100 const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] =
101                                    {"ceph_latency", "ceph_bytes", "ceph_rate"};
102
103 /******* ceph_daemon *******/
104 struct ceph_daemon
105 {
106     /** Version of the admin_socket interface */
107     uint32_t version;
108     /** daemon name **/
109     char name[DATA_MAX_NAME_LEN];
110
111     /** Path to the socket that we use to talk to the ceph daemon */
112     char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
113
114     /** Number of counters */
115     int ds_num;
116     /** Track ds types */
117     uint32_t *ds_types;
118     /** Track ds names to match with types */
119     char **ds_names;
120
121     /**
122      * Keep track of last data for latency values so we can calculate rate
123      * since last poll.
124      */
125     struct last_data **last_poll_data;
126     /** index of last poll data */
127     int last_idx;
128 };
129
130 /******* JSON parsing *******/
131 typedef int (*node_handler_t)(void *, const char*, const char*);
132
133 /** Track state and handler while parsing JSON */
134 struct yajl_struct
135 {
136     node_handler_t handler;
137     void * handler_arg;
138     struct {
139       char key[DATA_MAX_NAME_LEN];
140       int key_len;
141     } state[YAJL_MAX_DEPTH];
142     int depth;
143 };
144 typedef struct yajl_struct yajl_struct;
145
146 enum perfcounter_type_d
147 {
148     PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
149 };
150
151 /** Give user option to use default (long run = since daemon started) avg */
152 static int long_run_latency_avg = 0;
153
154 /**
155  * Give user option to use default type for special cases -
156  * filestore.journal_wr_bytes is currently only metric here. Ceph reports the
157  * type as a sum/count pair and will calculate it the same as a latency value.
158  * All other "bytes" metrics (excluding the used/capacity bytes for the OSD)
159  * use the DERIVE type. Unless user specifies to use given type, convert this
160  * metric to use DERIVE.
161  */
162 static int convert_special_metrics = 1;
163
164 /** Array of daemons to monitor */
165 static struct ceph_daemon **g_daemons = NULL;
166
167 /** Number of elements in g_daemons */
168 static int g_num_daemons = 0;
169
170 /**
171  * A set of data that we build up in memory while parsing the JSON.
172  */
173 struct values_tmp
174 {
175     /** ceph daemon we are processing data for*/
176     struct ceph_daemon *d;
177     /** track avgcount across counters for avgcount/sum latency pairs */
178     uint64_t avgcount;
179     /** current index of counters - used to get type of counter */
180     int index;
181     /** do we already have an avgcount for latency pair */
182     int avgcount_exists;
183     /**
184      * similar to index, but current index of latency type counters -
185      * used to get last poll data of counter
186      */
187     int latency_index;
188     /**
189      * values list - maintain across counters since
190      * host/plugin/plugin instance are always the same
191      */
192     value_list_t vlist;
193 };
194
195 /**
196  * A set of count/sum pairs to keep track of latency types and get difference
197  * between this poll data and last poll data.
198  */
199 struct last_data
200 {
201     char ds_name[DATA_MAX_NAME_LEN];
202     double last_sum;
203     uint64_t last_count;
204 };
205
206 /******* network I/O *******/
207 enum cstate_t
208 {
209     CSTATE_UNCONNECTED = 0,
210     CSTATE_WRITE_REQUEST,
211     CSTATE_READ_VERSION,
212     CSTATE_READ_AMT,
213     CSTATE_READ_JSON,
214 };
215
216 enum request_type_t
217 {
218     ASOK_REQ_VERSION = 0,
219     ASOK_REQ_DATA = 1,
220     ASOK_REQ_SCHEMA = 2,
221     ASOK_REQ_NONE = 1000,
222 };
223
224 struct cconn
225 {
226     /** The Ceph daemon that we're talking to */
227     struct ceph_daemon *d;
228
229     /** Request type */
230     uint32_t request_type;
231
232     /** The connection state */
233     enum cstate_t state;
234
235     /** The socket we use to talk to this daemon */
236     int asok;
237
238     /** The amount of data remaining to read / write. */
239     uint32_t amt;
240
241     /** Length of the JSON to read */
242     uint32_t json_len;
243
244     /** Buffer containing JSON data */
245     unsigned char *json;
246
247     /** Keep data important to yajl processing */
248     struct yajl_struct yajl;
249 };
250
251 static int ceph_cb_null(void *ctx)
252 {
253     return CEPH_CB_CONTINUE;
254 }
255
256 static int ceph_cb_boolean(void *ctx, int bool_val)
257 {
258     return CEPH_CB_CONTINUE;
259 }
260
261 static int
262 ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
263 {
264     yajl_struct *yajl = (yajl_struct*)ctx;
265     char buffer[number_len+1];
266     int i, latency_type = 0, result;
267     char key[128];
268
269     memcpy(buffer, number_val, number_len);
270     buffer[sizeof(buffer) - 1] = 0;
271
272     ssnprintf(key, yajl->state[0].key_len, "%s", yajl->state[0].key);
273     for(i = 1; i < yajl->depth; i++)
274     {
275         if((i == yajl->depth-1) && ((strcmp(yajl->state[i].key,"avgcount") == 0)
276                 || (strcmp(yajl->state[i].key,"sum") == 0)))
277         {
278             if(convert_special_metrics)
279             {
280                 /**
281                  * Special case for filestore:JournalWrBytes. For some reason,
282                  * Ceph schema encodes this as a count/sum pair while all
283                  * other "Bytes" data (excluding used/capacity bytes for OSD
284                  * space) uses a single "Derive" type. To spare further
285                  * confusion, keep this KPI as the same type of other "Bytes".
286                  * Instead of keeping an "average" or "rate", use the "sum" in
287                  * the pair and assign that to the derive value.
288                  */
289                 if((strcmp(yajl->state[i-1].key, "journal_wr_bytes") == 0) &&
290                         (strcmp(yajl->state[i-2].key,"filestore") == 0) &&
291                         (strcmp(yajl->state[i].key,"avgcount") == 0))
292                 {
293                     DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
294                     yajl->depth = (yajl->depth - 1);
295                     return CEPH_CB_CONTINUE;
296                 }
297             }
298             //probably a avgcount/sum pair. if not - we'll try full key later
299             latency_type = 1;
300             break;
301         }
302         strncat(key, ".", 1);
303         strncat(key, yajl->state[i].key, yajl->state[i].key_len+1);
304     }
305
306     result = yajl->handler(yajl->handler_arg, buffer, key);
307
308     if((result == RETRY_AVGCOUNT) && latency_type)
309     {
310         strncat(key, ".", 1);
311         strncat(key, yajl->state[yajl->depth-1].key,
312                 yajl->state[yajl->depth-1].key_len+1);
313         result = yajl->handler(yajl->handler_arg, buffer, key);
314     }
315
316     if(result == -ENOMEM)
317     {
318         ERROR("ceph plugin: memory allocation failed");
319         return CEPH_CB_ABORT;
320     }
321
322     yajl->depth = (yajl->depth - 1);
323     return CEPH_CB_CONTINUE;
324 }
325
326 static int ceph_cb_string(void *ctx, const unsigned char *string_val,
327         yajl_len_t string_len)
328 {
329     return CEPH_CB_CONTINUE;
330 }
331
332 static int ceph_cb_start_map(void *ctx)
333 {
334     return CEPH_CB_CONTINUE;
335 }
336
337 static int
338 ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
339 {
340     yajl_struct *yajl = (yajl_struct*)ctx;
341
342     if((yajl->depth+1)  >= YAJL_MAX_DEPTH)
343     {
344         ERROR("ceph plugin: depth exceeds max, aborting.");
345         return CEPH_CB_ABORT;
346     }
347
348     char buffer[string_len+1];
349
350     memcpy(buffer, key, string_len);
351     buffer[sizeof(buffer) - 1] = 0;
352
353     snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer);
354     yajl->state[yajl->depth].key_len = sizeof(buffer);
355     yajl->depth = (yajl->depth + 1);
356
357     return CEPH_CB_CONTINUE;
358 }
359
360 static int ceph_cb_end_map(void *ctx)
361 {
362     yajl_struct *yajl = (yajl_struct*)ctx;
363
364     yajl->depth = (yajl->depth - 1);
365     return CEPH_CB_CONTINUE;
366 }
367
368 static int ceph_cb_start_array(void *ctx)
369 {
370     return CEPH_CB_CONTINUE;
371 }
372
373 static int ceph_cb_end_array(void *ctx)
374 {
375     return CEPH_CB_CONTINUE;
376 }
377
378 static yajl_callbacks callbacks = {
379         ceph_cb_null,
380         ceph_cb_boolean,
381         NULL,
382         NULL,
383         ceph_cb_number,
384         ceph_cb_string,
385         ceph_cb_start_map,
386         ceph_cb_map_key,
387         ceph_cb_end_map,
388         ceph_cb_start_array,
389         ceph_cb_end_array
390 };
391
392 static void ceph_daemon_print(const struct ceph_daemon *d)
393 {
394     DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path);
395 }
396
397 static void ceph_daemons_print(void)
398 {
399     int i;
400     for(i = 0; i < g_num_daemons; ++i)
401     {
402         ceph_daemon_print(g_daemons[i]);
403     }
404 }
405
406 static void ceph_daemon_free(struct ceph_daemon *d)
407 {
408     int i = 0;
409     for(; i < d->last_idx; i++)
410     {
411         sfree(d->last_poll_data[i]);
412     }
413     sfree(d->last_poll_data);
414     d->last_poll_data = NULL;
415     d->last_idx = 0;
416     for(i = 0; i < d->ds_num; i++)
417     {
418         sfree(d->ds_names[i]);
419     }
420     sfree(d->ds_types);
421     sfree(d->ds_names);
422     sfree(d);
423 }
424
425 /* compact_ds_name removed the special characters ":", "_", "-" and "+" from the
426  * intput string. Characters following these special characters are capitalized.
427  * Trailing "+" and "-" characters are replaces with the strings "Plus" and
428  * "Minus". */
429 static int compact_ds_name (char *buffer, size_t buffer_size, char const *src)
430 {
431     char *src_copy;
432     size_t src_len;
433     char *ptr = buffer;
434     size_t ptr_size = buffer_size;
435     _Bool append_plus = 0;
436     _Bool append_minus = 0;
437
438     if ((buffer == NULL) || (buffer_size <= strlen ("Minus")) || (src == NULL))
439       return EINVAL;
440
441     src_copy = strdup (src);
442     src_len = strlen(src);
443
444     /* Remove trailing "+" and "-". */
445     if (src_copy[src_len - 1] == '+')
446     {
447         append_plus = 1;
448         src_len--;
449         src_copy[src_len] = 0;
450     }
451     else if (src_copy[src_len - 1] == '-')
452     {
453         append_minus = 1;
454         src_len--;
455         src_copy[src_len] = 0;
456     }
457
458     /* Split at special chars, capitalize first character, append to buffer. */
459     char *dummy = src_copy;
460     char *token;
461     char *save_ptr = NULL;
462     while ((token = strtok_r (dummy, ":_-+", &save_ptr)) != NULL)
463     {
464         size_t len;
465
466         dummy = NULL;
467
468         token[0] = toupper ((int) token[0]);
469
470         assert (ptr_size > 1);
471
472         len = strlen (token);
473         if (len >= ptr_size)
474             len = ptr_size - 1;
475
476         assert (len > 0);
477         assert (len < ptr_size);
478
479         sstrncpy (ptr, token, len + 1);
480         ptr += len;
481         ptr_size -= len;
482
483         assert (*ptr == 0);
484         if (ptr_size <= 1)
485             break;
486     }
487
488     /* Append "Plus" or "Minus" if "+" or "-" has been stripped above. */
489     if (append_plus || append_minus)
490     {
491         char const *append = "Plus";
492         if (append_minus)
493             append = "Minus";
494
495         size_t offset = buffer_size - (strlen (append) + 1);
496         if (offset > strlen (buffer))
497             offset = strlen (buffer);
498
499         sstrncpy (buffer + offset, append, buffer_size - offset);
500     }
501
502     sfree (src_copy);
503     return 0;
504 }
505
506 static _Bool has_suffix (char const *str, char const *suffix)
507 {
508     size_t str_len = strlen (str);
509     size_t suffix_len = strlen (suffix);
510     size_t offset;
511
512     if (suffix_len > str_len)
513         return 0;
514     offset = str_len - suffix_len;
515
516     if (strcmp (str + offset, suffix) == 0)
517         return 1;
518
519     return 0;
520 }
521
522 /* count_parts returns the number of elements a "foo.bar.baz" style key has. */
523 static size_t count_parts (char const *key)
524 {
525     char const *ptr;
526     size_t parts_num = 0;
527
528     for (ptr = key; ptr != NULL; ptr = strchr (ptr + 1, '.'))
529         parts_num++;
530
531     return parts_num;
532 }
533
534 /**
535  * Parse key to remove "type" if this is for schema and initiate compaction
536  */
537 static int parse_keys (char *buffer, size_t buffer_size, const char *key_str)
538 {
539     char tmp[2 * buffer_size];
540
541     if (buffer == NULL || buffer_size == 0 || key_str == NULL || strlen (key_str) == 0)
542         return EINVAL;
543
544     if ((count_parts (key_str) > 2) && has_suffix (key_str, ".type"))
545     {
546         /* strip ".type" suffix iff the key has more than two parts. */
547         size_t sz = strlen (key_str) - strlen (".type") + 1;
548
549         if (sz > sizeof (tmp))
550             sz = sizeof (tmp);
551         sstrncpy (tmp, key_str, sz);
552     }
553     else
554     {
555         sstrncpy (tmp, key_str, sizeof (tmp));
556     }
557
558     return compact_ds_name (buffer, buffer_size, tmp);
559 }
560
561 /**
562  * while parsing ceph admin socket schema, save counter name and type for later
563  * data processing
564  */
565 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
566         int pc_type)
567 {
568     uint32_t type;
569     char ds_name[DATA_MAX_NAME_LEN];
570     memset(ds_name, 0, sizeof(ds_name));
571
572     if(convert_special_metrics)
573     {
574         /**
575          * Special case for filestore:JournalWrBytes. For some reason, Ceph
576          * schema encodes this as a count/sum pair while all other "Bytes" data
577          * (excluding used/capacity bytes for OSD space) uses a single "Derive"
578          * type. To spare further confusion, keep this KPI as the same type of
579          * other "Bytes". Instead of keeping an "average" or "rate", use the
580          * "sum" in the pair and assign that to the derive value.
581          */
582         if((strcmp(name,"filestore.journal_wr_bytes.type") == 0))
583         {
584             pc_type = 10;
585         }
586     }
587
588     d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1));
589     if(!d->ds_names)
590     {
591         return -ENOMEM;
592     }
593
594     d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1));
595     if(!d->ds_types)
596     {
597         return -ENOMEM;
598     }
599
600     d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN);
601     if(!d->ds_names[d->ds_num])
602     {
603         return -ENOMEM;
604     }
605
606     type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE :
607             ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
608     d->ds_types[d->ds_num] = type;
609
610     if (parse_keys(ds_name, sizeof (ds_name), name))
611     {
612         return 1;
613     }
614
615     sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
616     d->ds_num = (d->ds_num + 1);
617
618     return 0;
619 }
620
621 /******* ceph_config *******/
622 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
623 {
624     const char *val;
625     if(item->values_num != 1)
626     {
627         return -ENOTSUP;
628     }
629     if(item->values[0].type != OCONFIG_TYPE_STRING)
630     {
631         return -ENOTSUP;
632     }
633     val = item->values[0].value.string;
634     if(snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
635     {
636         ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
637                 item->key);
638         return -ENAMETOOLONG;
639     }
640     return 0;
641 }
642
643 static int cc_handle_bool(struct oconfig_item_s *item, int *dest)
644 {
645     if(item->values_num != 1)
646     {
647         return -ENOTSUP;
648     }
649
650     if(item->values[0].type != OCONFIG_TYPE_BOOLEAN)
651     {
652         return -ENOTSUP;
653     }
654
655     *dest = (item->values[0].value.boolean) ? 1 : 0;
656     return 0;
657 }
658
659 static int cc_add_daemon_config(oconfig_item_t *ci)
660 {
661     int ret, i;
662     struct ceph_daemon *nd, cd;
663     struct ceph_daemon **tmp;
664     memset(&cd, 0, sizeof(struct ceph_daemon));
665
666     if((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
667     {
668         WARNING("ceph plugin: `Daemon' blocks need exactly one string "
669                 "argument.");
670         return (-1);
671     }
672
673     ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
674     if(ret)
675     {
676         return ret;
677     }
678
679     for(i=0; i < ci->children_num; i++)
680     {
681         oconfig_item_t *child = ci->children + i;
682
683         if(strcasecmp("SocketPath", child->key) == 0)
684         {
685             ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
686             if(ret)
687             {
688                 return ret;
689             }
690         }
691         else
692         {
693             WARNING("ceph plugin: ignoring unknown option %s", child->key);
694         }
695     }
696     if(cd.name[0] == '\0')
697     {
698         ERROR("ceph plugin: you must configure a daemon name.\n");
699         return -EINVAL;
700     }
701     else if(cd.asok_path[0] == '\0')
702     {
703         ERROR("ceph plugin(name=%s): you must configure an administrative "
704         "socket path.\n", cd.name);
705         return -EINVAL;
706     }
707     else if(!((cd.asok_path[0] == '/') ||
708             (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
709     {
710         ERROR("ceph plugin(name=%s): administrative socket paths must begin "
711                 "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
712         return -EINVAL;
713     }
714
715     tmp = realloc(g_daemons, (g_num_daemons+1) * sizeof(*g_daemons));
716     if(tmp == NULL)
717     {
718         /* The positive return value here indicates that this is a
719          * runtime error, not a configuration error.  */
720         return ENOMEM;
721     }
722     g_daemons = tmp;
723
724     nd = malloc(sizeof(*nd));
725     if(!nd)
726     {
727         return ENOMEM;
728     }
729     memcpy(nd, &cd, sizeof(*nd));
730     g_daemons[g_num_daemons++] = nd;
731     return 0;
732 }
733
734 static int ceph_config(oconfig_item_t *ci)
735 {
736     int ret, i;
737
738     for(i = 0; i < ci->children_num; ++i)
739     {
740         oconfig_item_t *child = ci->children + i;
741         if(strcasecmp("Daemon", child->key) == 0)
742         {
743             ret = cc_add_daemon_config(child);
744             if(ret == ENOMEM)
745             {
746                 ERROR("ceph plugin: Couldn't allocate memory");
747                 return ret;
748             }
749             else if(ret)
750             {
751                 //process other daemons and ignore this one
752                 continue;
753             }
754         }
755         else if(strcasecmp("LongRunAvgLatency", child->key) == 0)
756         {
757             ret = cc_handle_bool(child, &long_run_latency_avg);
758             if(ret)
759             {
760                 return ret;
761             }
762         }
763         else if(strcasecmp("ConvertSpecialMetricTypes", child->key) == 0)
764         {
765             ret = cc_handle_bool(child, &convert_special_metrics);
766             if(ret)
767             {
768                 return ret;
769             }
770         }
771         else
772         {
773             WARNING("ceph plugin: ignoring unknown option %s", child->key);
774         }
775     }
776     return 0;
777 }
778
779 /**
780  * Parse JSON and get error message if present
781  */
782 static int
783 traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
784 {
785     yajl_status status = yajl_parse(hand, json, json_len);
786     unsigned char *msg;
787
788     switch(status)
789     {
790         case yajl_status_error:
791             msg = yajl_get_error(hand, /* verbose = */ 1,
792                                        /* jsonText = */ (unsigned char *) json,
793                                                       (unsigned int) json_len);
794             ERROR ("ceph plugin: yajl_parse failed: %s", msg);
795             yajl_free_error(hand, msg);
796             return 1;
797         case yajl_status_client_canceled:
798             return 1;
799         default:
800             return 0;
801     }
802 }
803
804 /**
805  * Add entry for each counter while parsing schema
806  */
807 static int
808 node_handler_define_schema(void *arg, const char *val, const char *key)
809 {
810     struct ceph_daemon *d = (struct ceph_daemon *) arg;
811     int pc_type;
812     pc_type = atoi(val);
813     return ceph_daemon_add_ds_entry(d, key, pc_type);
814 }
815
816 /**
817  * Latency counter does not yet have an entry in last poll data - add it.
818  */
819 static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum,
820         uint64_t cur_count)
821 {
822     d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data));
823     if(!d->last_poll_data[d->last_idx])
824     {
825         return -ENOMEM;
826     }
827     sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n,
828             sizeof(d->last_poll_data[d->last_idx]->ds_name));
829     d->last_poll_data[d->last_idx]->last_sum = cur_sum;
830     d->last_poll_data[d->last_idx]->last_count = cur_count;
831     d->last_idx = (d->last_idx + 1);
832     return 0;
833 }
834
835 /**
836  * Update latency counter or add new entry if it doesn't exist
837  */
838 static int update_last(struct ceph_daemon *d, const char *ds_n, int index,
839         double cur_sum, uint64_t cur_count)
840 {
841     if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0))
842     {
843         d->last_poll_data[index]->last_sum = cur_sum;
844         d->last_poll_data[index]->last_count = cur_count;
845         return 0;
846     }
847
848     if(!d->last_poll_data)
849     {
850         d->last_poll_data = malloc(1 * sizeof(struct last_data *));
851         if(!d->last_poll_data)
852         {
853             return -ENOMEM;
854         }
855     }
856     else
857     {
858         struct last_data **tmp_last = realloc(d->last_poll_data,
859                 ((d->last_idx+1) * sizeof(struct last_data *)));
860         if(!tmp_last)
861         {
862             return -ENOMEM;
863         }
864         d->last_poll_data = tmp_last;
865     }
866     return add_last(d, ds_n, cur_sum, cur_count);
867 }
868
869 /**
870  * If using index guess failed (shouldn't happen, but possible if counters
871  * get rearranged), resort to searching for counter name
872  */
873 static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n)
874 {
875     int i = 0;
876     for(; i < d->last_idx; i++)
877     {
878         if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0)
879         {
880             return i;
881         }
882     }
883     return -1;
884 }
885
886 /**
887  * Calculate average b/t current data and last poll data
888  * if last poll data exists
889  */
890 static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index,
891         double cur_sum, uint64_t cur_count)
892 {
893     double result = -1.1, sum_delt = 0.0;
894     uint64_t count_delt = 0;
895     int tmp_index = 0;
896     if(d->last_idx > index)
897     {
898         if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)
899         {
900             tmp_index = index;
901         }
902         //test previous index
903         else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0))
904         {
905             tmp_index = (index - 1);
906         }
907         else
908         {
909             tmp_index = backup_search_for_last_avg(d, ds_n);
910         }
911
912         if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count))
913         {
914             sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum);
915             count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count);
916             result = (sum_delt / count_delt);
917         }
918     }
919
920     if(result == -1.1)
921     {
922         result = NAN;
923     }
924     if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM)
925     {
926         return -ENOMEM;
927     }
928     return result;
929 }
930
931 /**
932  * If using index guess failed, resort to searching for counter name
933  */
934 static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name)
935 {
936     int idx = 0;
937     for(; idx < d->ds_num; idx++)
938     {
939         if(strcmp(d->ds_names[idx], ds_name) == 0)
940         {
941             return d->ds_types[idx];
942         }
943     }
944     return DSET_TYPE_UNFOUND;
945 }
946
947 /**
948  * Process counter data and dispatch values
949  */
950 static int node_handler_fetch_data(void *arg, const char *val, const char *key)
951 {
952     value_t uv;
953     double tmp_d;
954     uint64_t tmp_u;
955     struct values_tmp *vtmp = (struct values_tmp*) arg;
956     uint32_t type = DSET_TYPE_UNFOUND;
957     int index = vtmp->index;
958
959     char ds_name[DATA_MAX_NAME_LEN];
960     memset(ds_name, 0, sizeof(ds_name));
961
962     if (parse_keys (ds_name, sizeof (ds_name), key))
963     {
964         return 1;
965     }
966
967     if(index >= vtmp->d->ds_num)
968     {
969         //don't overflow bounds of array
970         index = (vtmp->d->ds_num - 1);
971     }
972
973     /**
974      * counters should remain in same order we parsed schema... we maintain the
975      * index variable to keep track of current point in list of counters. first
976      * use index to guess point in array for retrieving type. if that doesn't
977      * work, use the old way to get the counter type
978      */
979     if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0)
980     {
981         //found match
982         type = vtmp->d->ds_types[index];
983     }
984     else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0))
985     {
986         //try previous key
987         type = vtmp->d->ds_types[index-1];
988     }
989
990     if(type == DSET_TYPE_UNFOUND)
991     {
992         //couldn't find right type by guessing, check the old way
993         type = backup_search_for_type(vtmp->d, ds_name);
994     }
995
996     switch(type)
997     {
998         case DSET_LATENCY:
999             if(vtmp->avgcount_exists == -1)
1000             {
1001                 sscanf(val, "%" PRIu64, &vtmp->avgcount);
1002                 vtmp->avgcount_exists = 0;
1003                 //return after saving avgcount - don't dispatch value
1004                 //until latency calculation
1005                 return 0;
1006             }
1007             else
1008             {
1009                 double sum, result;
1010                 sscanf(val, "%lf", &sum);
1011
1012                 if(vtmp->avgcount == 0)
1013                 {
1014                     vtmp->avgcount = 1;
1015                 }
1016
1017                 /** User wants latency values as long run avg */
1018                 if(long_run_latency_avg)
1019                 {
1020                     result = (sum / vtmp->avgcount);
1021                 }
1022                 else
1023                 {
1024                     result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount);
1025                     if(result == -ENOMEM)
1026                     {
1027                         return -ENOMEM;
1028                     }
1029                 }
1030
1031                 uv.gauge = result;
1032                 vtmp->avgcount_exists = -1;
1033                 vtmp->latency_index = (vtmp->latency_index + 1);
1034             }
1035             break;
1036         case DSET_BYTES:
1037             sscanf(val, "%lf", &tmp_d);
1038             uv.gauge = tmp_d;
1039             break;
1040         case DSET_RATE:
1041             sscanf(val, "%" PRIu64, &tmp_u);
1042             uv.derive = tmp_u;
1043             break;
1044         case DSET_TYPE_UNFOUND:
1045         default:
1046             ERROR("ceph plugin: ds %s was not properly initialized.", ds_name);
1047             return -1;
1048     }
1049
1050     sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type));
1051     sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance));
1052     vtmp->vlist.values = &uv;
1053     vtmp->vlist.values_len = 1;
1054
1055     vtmp->index = (vtmp->index + 1);
1056     plugin_dispatch_values(&vtmp->vlist);
1057
1058     return 0;
1059 }
1060
1061 static int cconn_connect(struct cconn *io)
1062 {
1063     struct sockaddr_un address;
1064     int flags, fd, err;
1065     if(io->state != CSTATE_UNCONNECTED)
1066     {
1067         ERROR("ceph plugin: cconn_connect: io->state != CSTATE_UNCONNECTED");
1068         return -EDOM;
1069     }
1070     fd = socket(PF_UNIX, SOCK_STREAM, 0);
1071     if(fd < 0)
1072     {
1073         int err = -errno;
1074         ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) "
1075             "failed: error %d", err);
1076         return err;
1077     }
1078     memset(&address, 0, sizeof(struct sockaddr_un));
1079     address.sun_family = AF_UNIX;
1080     snprintf(address.sun_path, sizeof(address.sun_path), "%s",
1081             io->d->asok_path);
1082     RETRY_ON_EINTR(err,
1083         connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
1084     if(err < 0)
1085     {
1086         ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d",
1087             fd, err);
1088         close(fd);
1089         return err;
1090     }
1091
1092     flags = fcntl(fd, F_GETFL, 0);
1093     if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
1094     {
1095         err = -errno;
1096         ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d",
1097             fd, err);
1098         close(fd);
1099         return err;
1100     }
1101     io->asok = fd;
1102     io->state = CSTATE_WRITE_REQUEST;
1103     io->amt = 0;
1104     io->json_len = 0;
1105     io->json = NULL;
1106     return 0;
1107 }
1108
1109 static void cconn_close(struct cconn *io)
1110 {
1111     io->state = CSTATE_UNCONNECTED;
1112     if(io->asok != -1)
1113     {
1114         int res;
1115         RETRY_ON_EINTR(res, close(io->asok));
1116     }
1117     io->asok = -1;
1118     io->amt = 0;
1119     io->json_len = 0;
1120     sfree(io->json);
1121     io->json = NULL;
1122 }
1123
1124 /* Process incoming JSON counter data */
1125 static int
1126 cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand)
1127 {
1128     int ret;
1129     struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
1130     if(!vtmp)
1131     {
1132         return -ENOMEM;
1133     }
1134
1135     vtmp->vlist = (value_list_t)VALUE_LIST_INIT;
1136     sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host));
1137     sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin));
1138     sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance));
1139
1140     vtmp->d = io->d;
1141     vtmp->avgcount_exists = -1;
1142     vtmp->latency_index = 0;
1143     vtmp->index = 0;
1144     yajl->handler_arg = vtmp;
1145     ret = traverse_json(io->json, io->json_len, hand);
1146     sfree(vtmp);
1147     return ret;
1148 }
1149
1150 /**
1151  * Initiate JSON parsing and print error if one occurs
1152  */
1153 static int cconn_process_json(struct cconn *io)
1154 {
1155     if((io->request_type != ASOK_REQ_DATA) &&
1156             (io->request_type != ASOK_REQ_SCHEMA))
1157     {
1158         return -EDOM;
1159     }
1160
1161     int result = 1;
1162     yajl_handle hand;
1163     yajl_status status;
1164
1165     hand = yajl_alloc(&callbacks,
1166 #if HAVE_YAJL_V2
1167       /* alloc funcs = */ NULL,
1168 #else
1169       /* alloc funcs = */ NULL, NULL,
1170 #endif
1171       /* context = */ (void *)(&io->yajl));
1172
1173     if(!hand)
1174     {
1175         ERROR ("ceph plugin: yajl_alloc failed.");
1176         return ENOMEM;
1177     }
1178
1179     io->yajl.depth = 0;
1180
1181     switch(io->request_type)
1182     {
1183         case ASOK_REQ_DATA:
1184             io->yajl.handler = node_handler_fetch_data;
1185             result = cconn_process_data(io, &io->yajl, hand);
1186             break;
1187         case ASOK_REQ_SCHEMA:
1188             //init daemon specific variables
1189             io->d->ds_num = 0;
1190             io->d->last_idx = 0;
1191             io->d->last_poll_data = NULL;
1192             io->yajl.handler = node_handler_define_schema;
1193             io->yajl.handler_arg = io->d;
1194             result = traverse_json(io->json, io->json_len, hand);
1195             break;
1196     }
1197
1198     if(result)
1199     {
1200         goto done;
1201     }
1202
1203 #if HAVE_YAJL_V2
1204     status = yajl_complete_parse(hand);
1205 #else
1206     status = yajl_parse_complete(hand);
1207 #endif
1208
1209     if (status != yajl_status_ok)
1210     {
1211       unsigned char *errmsg = yajl_get_error (hand, /* verbose = */ 0,
1212           /* jsonText = */ NULL, /* jsonTextLen = */ 0);
1213       ERROR ("ceph plugin: yajl_parse_complete failed: %s",
1214           (char *) errmsg);
1215       yajl_free_error (hand, errmsg);
1216       yajl_free (hand);
1217       return 1;
1218     }
1219
1220     done:
1221     yajl_free (hand);
1222     return result;
1223 }
1224
1225 static int cconn_validate_revents(struct cconn *io, int revents)
1226 {
1227     if(revents & POLLERR)
1228     {
1229         ERROR("ceph plugin: cconn_validate_revents(name=%s): got POLLERR",
1230             io->d->name);
1231         return -EIO;
1232     }
1233     switch (io->state)
1234     {
1235         case CSTATE_WRITE_REQUEST:
1236             return (revents & POLLOUT) ? 0 : -EINVAL;
1237         case CSTATE_READ_VERSION:
1238         case CSTATE_READ_AMT:
1239         case CSTATE_READ_JSON:
1240             return (revents & POLLIN) ? 0 : -EINVAL;
1241         default:
1242             ERROR("ceph plugin: cconn_validate_revents(name=%s) got to "
1243                 "illegal state on line %d", io->d->name, __LINE__);
1244             return -EDOM;
1245     }
1246 }
1247
1248 /** Handle a network event for a connection */
1249 static int cconn_handle_event(struct cconn *io)
1250 {
1251     int ret;
1252     switch (io->state)
1253     {
1254         case CSTATE_UNCONNECTED:
1255             ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
1256                 "state on line %d", io->d->name, __LINE__);
1257
1258             return -EDOM;
1259         case CSTATE_WRITE_REQUEST:
1260         {
1261             char cmd[32];
1262             snprintf(cmd, sizeof(cmd), "%s%d%s", "{ \"prefix\": \"",
1263                     io->request_type, "\" }\n");
1264             size_t cmd_len = strlen(cmd);
1265             RETRY_ON_EINTR(ret,
1266                   write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
1267             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
1268                     io->d->name, io->state, io->amt, ret);
1269             if(ret < 0)
1270             {
1271                 return ret;
1272             }
1273             io->amt += ret;
1274             if(io->amt >= cmd_len)
1275             {
1276                 io->amt = 0;
1277                 switch (io->request_type)
1278                 {
1279                     case ASOK_REQ_VERSION:
1280                         io->state = CSTATE_READ_VERSION;
1281                         break;
1282                     default:
1283                         io->state = CSTATE_READ_AMT;
1284                         break;
1285                 }
1286             }
1287             return 0;
1288         }
1289         case CSTATE_READ_VERSION:
1290         {
1291             RETRY_ON_EINTR(ret,
1292                     read(io->asok, ((char*)(&io->d->version)) + io->amt,
1293                             sizeof(io->d->version) - io->amt));
1294             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1295                     io->d->name, io->state, ret);
1296             if(ret < 0)
1297             {
1298                 return ret;
1299             }
1300             io->amt += ret;
1301             if(io->amt >= sizeof(io->d->version))
1302             {
1303                 io->d->version = ntohl(io->d->version);
1304                 if(io->d->version != 1)
1305                 {
1306                     ERROR("ceph plugin: cconn_handle_event(name=%s) not "
1307                         "expecting version %d!", io->d->name, io->d->version);
1308                     return -ENOTSUP;
1309                 }
1310                 DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as "
1311                         "version %d", io->d->name, io->d->version);
1312                 io->amt = 0;
1313                 cconn_close(io);
1314                 io->request_type = ASOK_REQ_SCHEMA;
1315             }
1316             return 0;
1317         }
1318         case CSTATE_READ_AMT:
1319         {
1320             RETRY_ON_EINTR(ret,
1321                     read(io->asok, ((char*)(&io->json_len)) + io->amt,
1322                             sizeof(io->json_len) - io->amt));
1323             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1324                     io->d->name, io->state, ret);
1325             if(ret < 0)
1326             {
1327                 return ret;
1328             }
1329             io->amt += ret;
1330             if(io->amt >= sizeof(io->json_len))
1331             {
1332                 io->json_len = ntohl(io->json_len);
1333                 io->amt = 0;
1334                 io->state = CSTATE_READ_JSON;
1335                 io->json = calloc(1, io->json_len + 1);
1336                 if(!io->json)
1337                 {
1338                     ERROR("ceph plugin: error callocing io->json");
1339                     return -ENOMEM;
1340                 }
1341             }
1342             return 0;
1343         }
1344         case CSTATE_READ_JSON:
1345         {
1346             RETRY_ON_EINTR(ret,
1347                    read(io->asok, io->json + io->amt, io->json_len - io->amt));
1348             DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
1349                     io->d->name, io->state, ret);
1350             if(ret < 0)
1351             {
1352                 return ret;
1353             }
1354             io->amt += ret;
1355             if(io->amt >= io->json_len)
1356             {
1357                 ret = cconn_process_json(io);
1358                 if(ret)
1359                 {
1360                     return ret;
1361                 }
1362                 cconn_close(io);
1363                 io->request_type = ASOK_REQ_NONE;
1364             }
1365             return 0;
1366         }
1367         default:
1368             ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
1369                 "state on line %d", io->d->name, __LINE__);
1370             return -EDOM;
1371     }
1372 }
1373
1374 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
1375 {
1376     int ret;
1377     if(io->request_type == ASOK_REQ_NONE)
1378     {
1379         /* The request has already been serviced. */
1380         return 0;
1381     }
1382     else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0))
1383     {
1384         /* If there are no counters to report on, don't bother
1385          * connecting */
1386         return 0;
1387     }
1388
1389     switch (io->state)
1390     {
1391         case CSTATE_UNCONNECTED:
1392             ret = cconn_connect(io);
1393             if(ret > 0)
1394             {
1395                 return -ret;
1396             }
1397             else if(ret < 0)
1398             {
1399                 return ret;
1400             }
1401             fds->fd = io->asok;
1402             fds->events = POLLOUT;
1403             return 1;
1404         case CSTATE_WRITE_REQUEST:
1405             fds->fd = io->asok;
1406             fds->events = POLLOUT;
1407             return 1;
1408         case CSTATE_READ_VERSION:
1409         case CSTATE_READ_AMT:
1410         case CSTATE_READ_JSON:
1411             fds->fd = io->asok;
1412             fds->events = POLLIN;
1413             return 1;
1414         default:
1415             ERROR("ceph plugin: cconn_prepare(name=%s) got to illegal state "
1416                 "on line %d", io->d->name, __LINE__);
1417             return -EDOM;
1418     }
1419 }
1420
1421 /** Returns the difference between two struct timevals in milliseconds.
1422  * On overflow, we return max/min int.
1423  */
1424 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
1425 {
1426     int64_t ret;
1427     int sec_diff = t1->tv_sec - t2->tv_sec;
1428     int usec_diff = t1->tv_usec - t2->tv_usec;
1429     ret = usec_diff / 1000;
1430     ret += (sec_diff * 1000);
1431     return (ret > INT_MAX) ? INT_MAX : ((ret < INT_MIN) ? INT_MIN : (int)ret);
1432 }
1433
1434 /** This handles the actual network I/O to talk to the Ceph daemons.
1435  */
1436 static int cconn_main_loop(uint32_t request_type)
1437 {
1438     int i, ret, some_unreachable = 0;
1439     struct timeval end_tv;
1440     struct cconn io_array[g_num_daemons];
1441
1442     DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type);
1443
1444     /* create cconn array */
1445     memset(io_array, 0, sizeof(io_array));
1446     for(i = 0; i < g_num_daemons; ++i)
1447     {
1448         io_array[i].d = g_daemons[i];
1449         io_array[i].request_type = request_type;
1450         io_array[i].state = CSTATE_UNCONNECTED;
1451     }
1452
1453     /** Calculate the time at which we should give up */
1454     gettimeofday(&end_tv, NULL);
1455     end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1456
1457     while (1)
1458     {
1459         int nfds, diff;
1460         struct timeval tv;
1461         struct cconn *polled_io_array[g_num_daemons];
1462         struct pollfd fds[g_num_daemons];
1463         memset(fds, 0, sizeof(fds));
1464         nfds = 0;
1465         for(i = 0; i < g_num_daemons; ++i)
1466         {
1467             struct cconn *io = io_array + i;
1468             ret = cconn_prepare(io, fds + nfds);
1469             if(ret < 0)
1470             {
1471                 WARNING("ceph plugin: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1472                         io->d->name, i, io->state, ret);
1473                 cconn_close(io);
1474                 io->request_type = ASOK_REQ_NONE;
1475                 some_unreachable = 1;
1476             }
1477             else if(ret == 1)
1478             {
1479                 polled_io_array[nfds++] = io_array + i;
1480             }
1481         }
1482         if(nfds == 0)
1483         {
1484             /* finished */
1485             ret = 0;
1486             goto done;
1487         }
1488         gettimeofday(&tv, NULL);
1489         diff = milli_diff(&end_tv, &tv);
1490         if(diff <= 0)
1491         {
1492             /* Timed out */
1493             ret = -ETIMEDOUT;
1494             WARNING("ceph plugin: cconn_main_loop: timed out.");
1495             goto done;
1496         }
1497         RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1498         if(ret < 0)
1499         {
1500             ERROR("ceph plugin: poll(2) error: %d", ret);
1501             goto done;
1502         }
1503         for(i = 0; i < nfds; ++i)
1504         {
1505             struct cconn *io = polled_io_array[i];
1506             int revents = fds[i].revents;
1507             if(revents == 0)
1508             {
1509                 /* do nothing */
1510             }
1511             else if(cconn_validate_revents(io, revents))
1512             {
1513                 WARNING("ceph plugin: cconn(name=%s,i=%d,st=%d): "
1514                 "revents validation error: "
1515                 "revents=0x%08x", io->d->name, i, io->state, revents);
1516                 cconn_close(io);
1517                 io->request_type = ASOK_REQ_NONE;
1518                 some_unreachable = 1;
1519             }
1520             else
1521             {
1522                 int ret = cconn_handle_event(io);
1523                 if(ret)
1524                 {
1525                     WARNING("ceph plugin: cconn_handle_event(name=%s,"
1526                     "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1527                     cconn_close(io);
1528                     io->request_type = ASOK_REQ_NONE;
1529                     some_unreachable = 1;
1530                 }
1531             }
1532         }
1533     }
1534     done: for(i = 0; i < g_num_daemons; ++i)
1535     {
1536         cconn_close(io_array + i);
1537     }
1538     if(some_unreachable)
1539     {
1540         DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable.");
1541     }
1542     else
1543     {
1544         DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)");
1545     }
1546     return ret;
1547 }
1548
1549 static int ceph_read(void)
1550 {
1551     return cconn_main_loop(ASOK_REQ_DATA);
1552 }
1553
1554 /******* lifecycle *******/
1555 static int ceph_init(void)
1556 {
1557     int ret;
1558     ceph_daemons_print();
1559
1560     ret = cconn_main_loop(ASOK_REQ_VERSION);
1561
1562     return (ret) ? ret : 0;
1563 }
1564
1565 static int ceph_shutdown(void)
1566 {
1567     int i;
1568     for(i = 0; i < g_num_daemons; ++i)
1569     {
1570         ceph_daemon_free(g_daemons[i]);
1571     }
1572     sfree(g_daemons);
1573     g_daemons = NULL;
1574     g_num_daemons = 0;
1575     DEBUG("ceph plugin: finished ceph_shutdown");
1576     return 0;
1577 }
1578
1579 void module_register(void)
1580 {
1581     plugin_register_complex_config("ceph", ceph_config);
1582     plugin_register_init("ceph", ceph_init);
1583     plugin_register_read("ceph", ceph_read);
1584     plugin_register_shutdown("ceph", ceph_shutdown);
1585 }