2 * collectd - src/ceph.c
3 * Copyright (C) 2011 New Dream Network
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Colin McCabe <cmccabe@alumni.cmu.edu>
20 * Dennis Zou <yunzou@cisco.com>
21 * Dan Ryder <daryder@cisco.com>
30 #include <arpa/inet.h>
33 #include <json/json.h>
34 #include <json/json_object_private.h> /* need for struct json_object_iter */
42 #include <sys/socket.h>
44 #include <sys/types.h>
47 #define MAX_RRD_DS_NAME_LEN 20
49 #define RETRY_ON_EINTR(ret, expr) \
59 /** Timeout interval in seconds */
60 #define CEPH_TIMEOUT_INTERVAL 1
62 /** Maximum path length for a UNIX domain socket on this system */
63 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
65 /******* ceph_daemon *******/
68 /** Version of the admin_socket interface */
71 char name[DATA_MAX_NAME_LEN];
75 /** Path to the socket that we use to talk to the ceph daemon */
76 char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
78 /** The set of key/value pairs that this daemon reports
79 * dset.type The daemon name
80 * dset.ds_num Number of data sources (key/value pairs)
81 * dset.ds Dynamically allocated array of key/value pairs
83 //struct data_set_s dset;
84 /** Dynamically allocated array **/
85 struct data_set_s *dset;
89 enum perfcounter_type_d
91 PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
94 /** Array of daemons to monitor */
95 static struct ceph_daemon **g_daemons = NULL;
97 /** Number of elements in g_daemons */
98 static int g_num_daemons = 0;
100 static void ceph_daemon_print(const struct ceph_daemon *d)
102 DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
105 static void ceph_daemons_print(void)
108 for (i = 0; i < g_num_daemons; ++i)
110 ceph_daemon_print(g_daemons[i]);
114 struct last_data **last_poll_data = NULL;
117 /*static void ceph_daemon_free(struct ceph_daemon *d)
119 plugin_unregister_data_set(d->dset.type);
123 static void ceph_daemon_free(struct ceph_daemon *d)
126 for (; i < d->dset_num; i++)
128 plugin_unregister_data_set((d->dset + i)->type);
130 sfree(d->pc_types[i]);
137 static void compact_ds_name(char *source, char *dest)
140 char *save_ptr = NULL, *tmp_ptr = source;
143 char tmp[DATA_MAX_NAME_LEN];
146 memset(tmp, 0, sizeof(tmp));
147 if (source == NULL || dest == NULL || source[0] == '\0' || dest[0] != '\0')
151 size_t src_len = strlen(source);
152 snprintf(len_str, sizeof(len_str), "%zu", src_len);
153 unsigned char append_status = 0x0;
154 append_status |= (source[src_len - 1] == '-') ? 0x1 : 0x0;
155 append_status |= (source[src_len - 1] == '+') ? 0x2 : 0x0;
156 while ((keys[keys_num] = strtok_r(tmp_ptr, ":_-+", &save_ptr)) != NULL)
159 /** capitalize 1st char **/
160 keys[keys_num][0] = toupper(keys[keys_num][0]);
165 /** concatenate each part of source string **/
166 for (i = 0; i < keys_num; i++)
168 strcat(tmp, keys[i]);
170 tmp[DATA_MAX_NAME_LEN - 1] = '\0';
171 /** to coordinate limitation of length of ds name from RRD
172 * we will truncate ds_name
173 * when the its length is more than
174 * MAX_RRD_DS_NAME_LEN
176 if (strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
178 append_status |= 0x4;
179 /** we should reserve space for
184 if (append_status & 0x1)
186 /** we should reserve space for
191 if (append_status & 0x2)
193 /** we should reserve space for
198 snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
199 offset = strlen(dest);
200 switch (append_status)
203 memcpy(dest + offset, "Minus", 5);
206 memcpy(dest + offset, "Plus", 5);
209 memcpy(dest + offset, len_str, 2);
212 memcpy(dest + offset, "Minus", 5);
213 memcpy(dest + offset + 5, len_str, 2);
216 memcpy(dest + offset, "Plus", 4);
217 memcpy(dest + offset + 4, len_str, 2);
223 static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
226 size_t dset_name_len = 0;
227 size_t ds_name_len = 0;
228 char tmp_ds_name[DATA_MAX_NAME_LEN];
229 memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
230 if (dset_name == NULL || ds_name == NULL || key_str == NULL
231 || key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
235 if ((ptr = strchr(key_str, '.')) == NULL
236 || (rptr = strrchr(key_str, '.')) == NULL)
238 strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
239 strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
243 (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
244 (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
245 memcpy(dset_name, key_str, dset_name_len);
247 (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
248 if (ds_name_len == 0)
249 { /** only have two keys **/
250 if (!strncmp(rptr + 1, "type", 4))
251 {/** if last key is "type",ignore **/
252 strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
255 {/** if last key isn't "type", copy last key **/
256 strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
259 else if (!strncmp(rptr + 1, "type", 4))
260 {/** more than two keys **/
261 memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
264 {/** copy whole keys **/
265 strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
267 compact: compact_ds_name(tmp_ds_name, ds_name);
271 int get_matching_dset(const struct ceph_daemon *d, const char *name)
274 for (idx = 0; idx < d->dset_num; ++idx)
276 if (strcmp(d->dset[idx].type, name) == 0)
284 int get_matching_value(const struct data_set_s *dset, const char *name,
288 for (idx = 0; idx < num_values; ++idx)
290 if (strcmp(dset->ds[idx].name, name) == 0)
298 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
301 struct data_source_s *ds;
302 struct data_set_s *dset;
303 struct data_set_s *dset_array;
304 int **pc_types_array = NULL;
308 if (strlen(name) + 1 > DATA_MAX_NAME_LEN)
309 return -ENAMETOOLONG;
310 char dset_name[DATA_MAX_NAME_LEN];
311 char ds_name[MAX_RRD_DS_NAME_LEN];
312 memset(dset_name, 0, sizeof(dset_name));
313 memset(ds_name, 0, sizeof(ds_name));
314 if (parse_keys(name, dset_name, ds_name))
316 idx = get_matching_dset(d, dset_name);
318 {/* need to add a dset **/
319 dset_array = realloc(d->dset,
320 sizeof(struct data_set_s) * (d->dset_num + 1));
323 pc_types_array = realloc(d->pc_types,
324 sizeof(int *) * (d->dset_num + 1));
327 dset = &dset_array[d->dset_num];
328 /** this step is very important, otherwise,
329 * realloc for dset->ds will tricky because of
330 * a random addr in dset->ds
332 memset(dset, 0, sizeof(struct data_set_s));
334 snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
335 pc_types = pc_types_array[d->dset_num] = NULL;
336 d->dset = dset_array;
340 dset = &d->dset[idx];
341 pc_types = d->pc_types[idx];
343 struct data_source_s *ds_array = realloc(dset->ds,
344 sizeof(struct data_source_s) * (dset->ds_num + 1));
349 pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
357 pc_types_array[d->dset_num] = pc_types_new;
358 d->pc_types = pc_types_array;
359 d->pc_types[d->dset_num][dset->ds_num] = pc_type;
364 d->pc_types[idx] = pc_types_new;
365 d->pc_types[idx][dset->ds_num] = pc_type;
367 ds = &ds_array[dset->ds_num++];
368 snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
370 (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE;
372 /** Special case for filestore:JournalWrBytes, we don't want to
373 use what the Ceph schema gives us (sum/count pair) */
374 if((strcmp(dset_name,"filestore") == 0) &&
375 (strcmp(ds_name,"JournalWrBytes") == 0))
377 ds->type = DS_TYPE_DERIVE;
379 /** Use min of 0 for DERIVE types so we dont' get negative values
380 on Ceph service restart */
381 if(ds->type == DS_TYPE_DERIVE)
385 else if(ds->type == DS_TYPE_GAUGE)
394 /******* ceph_config *******/
395 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
398 if (item->values_num != 1)
402 if (item->values[0].type != OCONFIG_TYPE_STRING)
406 val = item->values[0].value.string;
407 if (snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
409 ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
411 return -ENAMETOOLONG;
416 static int cc_add_daemon_config(oconfig_item_t *ci)
419 struct ceph_daemon *array, *nd, cd;
420 memset(&cd, 0, sizeof(struct ceph_daemon));
422 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
424 WARNING("ceph plugin: `Daemon' blocks need exactly one string argument.");
428 ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
432 for (i=0; i < ci->children_num; i++)
434 oconfig_item_t *child = ci->children + i;
436 if (strcasecmp("SocketPath", child->key) == 0)
438 ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
444 WARNING("ceph plugin: ignoring unknown option %s", child->key);
447 if (cd.name[0] == '\0')
449 ERROR("ceph plugin: you must configure a daemon name.\n");
452 else if (cd.asok_path[0] == '\0')
454 ERROR("ceph plugin(name=%s): you must configure an administrative "
455 "socket path.\n", cd.name);
458 else if (!((cd.asok_path[0] == '/')
459 || (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
461 ERROR("ceph plugin(name=%s): administrative socket paths must begin with "
462 "'/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
465 array = realloc(g_daemons,
466 sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
469 /* The positive return value here indicates that this is a
470 * runtime error, not a configuration error. */
473 g_daemons = (struct ceph_daemon**) array;
474 nd = malloc(sizeof(struct ceph_daemon));
477 memcpy(nd, &cd, sizeof(struct ceph_daemon));
478 g_daemons[g_num_daemons++] = nd;
482 static int ceph_config(oconfig_item_t *ci)
486 for (i = 0; i < ci->children_num; ++i)
488 oconfig_item_t *child = ci->children + i;
489 if (strcasecmp("Daemon", child->key) == 0)
491 ret = cc_add_daemon_config(child);
497 WARNING("ceph plugin: ignoring unknown option %s", child->key);
503 /******* JSON parsing *******/
504 typedef int (*node_handler_t)(void*, json_object*, const char*);
506 /** Perform a depth-first traversal of the JSON parse tree,
507 * calling node_handler at each node.*/
508 static int traverse_json_impl(json_object *jo, char *key, int max_key,
509 node_handler_t handler, void *handler_arg)
511 struct json_object_iter iter;
514 if (json_object_get_type(jo) != json_type_object)
517 json_object_object_foreachC(jo, iter)
519 klen = strlen(iter.key);
520 if (plen + klen + 2 > max_key)
521 return -ENAMETOOLONG;
523 strncat(key, ".", max_key); /* really should be strcat */
524 strncat(key, iter.key, max_key);
526 ret = handler(handler_arg, iter.val, key);
529 ret = traverse_json_impl(iter.val, key, max_key, handler,
542 static int traverse_json(const char *json, node_handler_t handler,
548 root = json_tokener_parse(json);
551 int result = traverse_json_impl(root, buf, sizeof(buf), handler, handler_arg);
552 json_object_put(root);
556 static int node_handler_define_schema(void *arg, json_object *jo,
559 struct ceph_daemon *d = (struct ceph_daemon *) arg;
561 if (json_object_get_type(jo) == json_type_object)
563 else if (json_object_get_type(jo) != json_type_int)
565 pc_type = json_object_get_int(jo);
566 DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
567 d->name, key, pc_type);
568 return ceph_daemon_add_ds_entry(d, key, pc_type);
576 /** A set of values_t data that we build up in memory while parsing the JSON. */
579 struct ceph_daemon *d;
581 struct values_holder vh[0];
586 char dset_name[DATA_MAX_NAME_LEN];
587 char ds_name[MAX_RRD_DS_NAME_LEN];
592 int add_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
594 last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
595 if(!last_poll_data[last_idx])
599 sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,sizeof(last_poll_data[last_idx]->dset_name));
600 sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,sizeof(last_poll_data[last_idx]->ds_name));
601 last_poll_data[last_idx]->last_sum = cur_sum;
602 last_poll_data[last_idx]->last_count = cur_count;
607 int update_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
610 for(i = 0; i < last_idx; i++)
612 if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0)
614 if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0)
616 last_poll_data[i]->last_sum = cur_sum;
617 last_poll_data[i]->last_count = cur_count;
623 if(NULL == last_poll_data)
625 last_poll_data = malloc(1 * sizeof(struct last_data *));
633 struct last_data **tmp_last = realloc(last_poll_data, ((last_idx+1) * sizeof(struct last_data *)));
638 last_poll_data = tmp_last;
640 add_last(dset_n,ds_n,cur_sum,cur_count);
644 double get_last_avg(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
647 double result = -1.1;
648 double sum_delt = 0.0;
649 uint64_t count_delt = 0;
650 for(i = 0; i < last_idx; i++)
652 if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0)
654 if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0)
656 if(cur_count < last_poll_data[i]->last_count)
660 sum_delt = (cur_sum - last_poll_data[i]->last_sum);
661 count_delt = (cur_count - last_poll_data[i]->last_count);
662 result = (sum_delt / count_delt);
668 result = (result == -1.1) ? NAN : result;
669 update_last(dset_n,ds_n,cur_sum,cur_count);
673 static int node_handler_fetch_data(void *arg, json_object *jo, const char *key)
675 int dset_idx, ds_idx;
677 char dset_name[DATA_MAX_NAME_LEN];
678 char ds_name[MAX_RRD_DS_NAME_LEN];
679 struct values_tmp *vtmp = (struct values_tmp*) arg;
680 memset(dset_name, 0, sizeof(dset_name));
681 memset(ds_name, 0, sizeof(ds_name));
682 if (parse_keys(key, dset_name, ds_name))
683 return 1;DEBUG("enter node_handler_fetch_data");
684 dset_idx = get_matching_dset(vtmp->d, dset_name);
687 ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
688 vtmp->d->dset[dset_idx].ds_num);
690 return 1;DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
691 dset_name,ds_name,dset_idx,ds_idx);
692 uv = &(vtmp->vh[dset_idx].values[ds_idx]);
694 /** Special case for filestore:JournalWrBytes, we don't want to
695 use what the Ceph schema gives us */
696 if((strcmp(dset_name,"filestore") == 0) &&
697 (strcmp(ds_name,"JournalWrBytes") == 0))
700 sum = json_object_object_get(jo, "sum");
703 uv->derive = (uint64_t) json_object_get_double(sum);
704 DEBUG("uv derive = %" PRIu64 "",(uint64_t) uv->derive);
706 else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY)
708 json_object *avgcount, *sum;
711 if (json_object_get_type(jo) != json_type_object)
713 avgcount = json_object_object_get(jo, "avgcount");
714 sum = json_object_object_get(jo, "sum");
715 if ((!avgcount) || (!sum))
717 avgcounti = json_object_get_int(avgcount);
718 DEBUG("avgcounti:%ld",avgcounti);
721 sumd = json_object_get_double(sum);
722 DEBUG("sumd:%lf",sumd);
723 double last_avg = get_last_avg(dset_name, ds_name, sumd, avgcounti);
724 uv->gauge = last_avg;
725 DEBUG("uv->gauge = (sumd_now - sumd_last) / (avgcounti_now - avgcounti_last) = :%lf",uv->gauge);
727 else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE)
729 /* We use json_object_get_double here because anything > 32
730 * bits may get truncated by json_object_get_int */
731 uv->derive = (uint64_t) json_object_get_double(jo);
732 DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive);
736 uv->gauge = json_object_get_double(jo);
737 DEBUG("uv->gauge %lf",uv->gauge);
742 /******* network I/O *******/
745 CSTATE_UNCONNECTED = 0,
746 CSTATE_WRITE_REQUEST,
754 ASOK_REQ_VERSION = 0,
757 ASOK_REQ_NONE = 1000,
762 /** The Ceph daemon that we're talking to */
763 struct ceph_daemon *d;
766 uint32_t request_type;
768 /** The connection state */
771 /** The socket we use to talk to this daemon */
774 /** The amount of data remaining to read / write. */
777 /** Length of the JSON to read */
780 /** Buffer containing JSON data */
784 static int cconn_connect(struct cconn *io)
786 struct sockaddr_un address;
788 if (io->state != CSTATE_UNCONNECTED)
790 ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED");
793 fd = socket(PF_UNIX, SOCK_STREAM, 0);
797 ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: "
801 memset(&address, 0, sizeof(struct sockaddr_un));
802 address.sun_family = AF_UNIX;
803 snprintf(address.sun_path, sizeof(address.sun_path), "%s",
806 connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
809 ERROR("cconn_connect: connect(%d) failed: error %d", fd, err);
813 flags = fcntl(fd, F_GETFL, 0);
814 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
817 ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err);
821 io->state = CSTATE_WRITE_REQUEST;
828 static void cconn_close(struct cconn *io)
830 io->state = CSTATE_UNCONNECTED;
834 RETRY_ON_EINTR(res, close(io->asok));
843 /* Process incoming JSON counter data */
844 /*static int cconn_process_data(struct cconn *io)
847 value_list_t vl = VALUE_LIST_INIT;
848 struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) +
849 (sizeof(value_t) * io->d->dset.ds_num));
853 vtmp->values_len = io->d->dset.ds_num;
854 ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
857 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
858 sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
859 sstrncpy(vl.type, io->d->dset.type, sizeof(vl.type));
860 vl.values = vtmp->values;
861 vl.values_len = vtmp->values_len;
862 DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
863 io->d->dset.type, vl.values_len, io->json);
864 ret = plugin_dispatch_values(&vl);
869 static int cconn_process_data(struct cconn *io)
872 struct values_tmp *vtmp = calloc(1,
873 sizeof(struct values_tmp)
874 + (sizeof(struct values_holder)) * io->d->dset_num);
877 for (i = 0; i < io->d->dset_num; i++)
879 value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
880 vtmp->vh[i].values = val;
881 vtmp->vh[i].values_len = io->d->dset[i].ds_num;
884 vtmp->holder_num = io->d->dset_num;
885 ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
888 for (i = 0; i < vtmp->holder_num; i++)
890 value_list_t vl = VALUE_LIST_INIT;
891 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
892 sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
893 strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
894 sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
895 vl.values = vtmp->vh[i].values;
896 vl.values_len = vtmp->vh[i].values_len;
897 DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
898 io->d->name, vl.values_len, io->json);
899 ret = plugin_dispatch_values(&vl);
904 done: for (i = 0; i < vtmp->holder_num; i++)
906 sfree(vtmp->vh[i].values);
912 static int cconn_process_json(struct cconn *io)
914 switch (io->request_type)
917 return cconn_process_data(io);
918 case ASOK_REQ_SCHEMA:
919 return traverse_json(io->json, node_handler_define_schema, io->d);
925 static int cconn_validate_revents(struct cconn *io, int revents)
927 if (revents & POLLERR)
929 ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name);
934 case CSTATE_WRITE_REQUEST:
935 return (revents & POLLOUT) ? 0 : -EINVAL;
936 case CSTATE_READ_VERSION:
937 case CSTATE_READ_AMT:
938 case CSTATE_READ_JSON:
939 return (revents & POLLIN) ? 0 : -EINVAL;
940 return (revents & POLLIN) ? 0 : -EINVAL;
942 ERROR("cconn_validate_revents(name=%s) got to illegal state on line %d",
943 io->d->name, __LINE__);
948 /** Handle a network event for a connection */
949 static int cconn_handle_event(struct cconn *io)
954 case CSTATE_UNCONNECTED:
955 ERROR("cconn_handle_event(name=%s) got to illegal state on line %d",
956 io->d->name, __LINE__);
959 case CSTATE_WRITE_REQUEST:
962 /*snprintf(cmd, sizeof(cmd), "%s%d%s", "{\"prefix\":\"", io->request_type,
964 char req_type_str[2];
965 snprintf(req_type_str, sizeof(req_type_str), "%1.1d", io->request_type);
966 json_object *cmd_object = json_object_new_object();
967 json_object_object_add(cmd_object, "prefix",
968 json_object_new_string(req_type_str));
969 const char *cmd_json = json_object_to_json_string(cmd_object);
970 /** we should send '\n' to server **/
971 snprintf(cmd, sizeof(cmd), "%s\n", cmd_json);
972 size_t cmd_len = strlen(cmd);
974 write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
975 DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
976 io->d->name, io->state, io->amt, ret);
980 if (io->amt >= cmd_len)
983 switch (io->request_type)
985 case ASOK_REQ_VERSION:
986 io->state = CSTATE_READ_VERSION;
989 io->state = CSTATE_READ_AMT;
993 json_object_put(cmd_object);
996 case CSTATE_READ_VERSION:
999 read(io->asok, ((char*)(&io->d->version)) + io->amt,
1000 sizeof(io->d->version) - io->amt));
1001 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1002 io->d->name, io->state, ret);
1006 if (io->amt >= sizeof(io->d->version))
1008 io->d->version = ntohl(io->d->version);
1009 if (io->d->version != 1)
1011 ERROR("cconn_handle_event(name=%s) not "
1012 "expecting version %d!", io->d->name, io->d->version);
1014 }DEBUG("cconn_handle_event(name=%s): identified as "
1015 "version %d", io->d->name, io->d->version);
1018 io->request_type = ASOK_REQ_SCHEMA;
1022 case CSTATE_READ_AMT:
1025 read(io->asok, ((char*)(&io->json_len)) + io->amt,
1026 sizeof(io->json_len) - io->amt));
1027 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1028 io->d->name, io->state, ret);
1032 if (io->amt >= sizeof(io->json_len))
1034 io->json_len = ntohl(io->json_len);
1036 io->state = CSTATE_READ_JSON;
1037 io->json = calloc(1, io->json_len + 1);
1043 case CSTATE_READ_JSON:
1046 read(io->asok, io->json + io->amt, io->json_len - io->amt));
1047 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1048 io->d->name, io->state, ret);
1052 if (io->amt >= io->json_len)
1054 ret = cconn_process_json(io);
1058 io->request_type = ASOK_REQ_NONE;
1063 ERROR("cconn_handle_event(name=%s) got to illegal state on "
1064 "line %d", io->d->name, __LINE__);
1069 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
1072 if (io->request_type == ASOK_REQ_NONE)
1074 /* The request has already been serviced. */
1077 else if ((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
1079 /* If there are no counters to report on, don't bother
1086 case CSTATE_UNCONNECTED:
1087 ret = cconn_connect(io);
1093 fds->events = POLLOUT;
1095 case CSTATE_WRITE_REQUEST:
1097 fds->events = POLLOUT;
1099 case CSTATE_READ_VERSION:
1100 case CSTATE_READ_AMT:
1101 case CSTATE_READ_JSON:
1103 fds->events = POLLIN;
1106 ERROR("cconn_prepare(name=%s) got to illegal state on line %d",
1107 io->d->name, __LINE__);
1112 /** Returns the difference between two struct timevals in milliseconds.
1113 * On overflow, we return max/min int.
1115 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
1118 int sec_diff = t1->tv_sec - t2->tv_sec;
1119 int usec_diff = t1->tv_usec - t2->tv_usec;
1120 ret = usec_diff / 1000;
1121 ret += (sec_diff * 1000);
1124 else if (ret < INT_MIN)
1129 /** This handles the actual network I/O to talk to the Ceph daemons.
1131 static int cconn_main_loop(uint32_t request_type)
1133 int i, ret, some_unreachable = 0;
1134 struct timeval end_tv;
1135 struct cconn io_array[g_num_daemons];
1137 DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
1139 /* create cconn array */
1140 memset(io_array, 0, sizeof(io_array));
1141 for (i = 0; i < g_num_daemons; ++i)
1143 io_array[i].d = g_daemons[i];
1144 io_array[i].request_type = request_type;
1145 io_array[i].state = CSTATE_UNCONNECTED;
1148 /** Calculate the time at which we should give up */
1149 gettimeofday(&end_tv, NULL);
1150 end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1156 struct cconn *polled_io_array[g_num_daemons];
1157 struct pollfd fds[g_num_daemons];
1158 memset(fds, 0, sizeof(fds));
1160 for (i = 0; i < g_num_daemons; ++i)
1162 struct cconn *io = io_array + i;
1163 ret = cconn_prepare(io, fds + nfds);
1166 WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1167 io->d->name, i, io->state, ret);
1169 io->request_type = ASOK_REQ_NONE;
1170 some_unreachable = 1;
1174 DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
1175 io->d->name, i, io->state);
1176 polled_io_array[nfds++] = io_array + i;
1183 DEBUG("cconn_main_loop: no more cconn to manage.");
1186 gettimeofday(&tv, NULL);
1187 diff = milli_diff(&end_tv, &tv);
1192 WARNING("ERROR: cconn_main_loop: timed out.\n");
1195 RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1198 ERROR("poll(2) error: %d", ret);
1201 for (i = 0; i < nfds; ++i)
1203 struct cconn *io = polled_io_array[i];
1204 int revents = fds[i].revents;
1209 else if (cconn_validate_revents(io, revents))
1211 WARNING("ERROR: cconn(name=%s,i=%d,st=%d): "
1212 "revents validation error: "
1213 "revents=0x%08x", io->d->name, i, io->state, revents);
1215 io->request_type = ASOK_REQ_NONE;
1216 some_unreachable = 1;
1220 int ret = cconn_handle_event(io);
1223 WARNING("ERROR: cconn_handle_event(name=%s,"
1224 "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1226 io->request_type = ASOK_REQ_NONE;
1227 some_unreachable = 1;
1232 done: for (i = 0; i < g_num_daemons; ++i)
1234 cconn_close(io_array + i);
1236 if (some_unreachable)
1238 DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
1242 DEBUG("cconn_main_loop: reached all Ceph daemons :)");
1247 static int ceph_read(void)
1249 return cconn_main_loop(ASOK_REQ_DATA);
1252 /******* lifecycle *******/
1253 static int ceph_init(void)
1257 ceph_daemons_print();
1259 ret = cconn_main_loop(ASOK_REQ_VERSION);
1262 for (i = 0; i < g_num_daemons; ++i)
1264 struct ceph_daemon *d = g_daemons[i];
1265 for (j = 0; j < d->dset_num; j++)
1267 ret = plugin_register_data_set(d->dset + j);
1270 ERROR("plugin_register_data_set(%s) failed!", d->name);
1274 DEBUG("plugin_register_data_set(%s): "
1275 "(d->dset)[%d]->ds_num=%d",
1276 d->name, j, d->dset[j].ds_num);
1283 static int ceph_shutdown(void)
1286 for (i = 0; i < g_num_daemons; ++i)
1288 ceph_daemon_free(g_daemons[i]);
1293 for(i = 0; i < last_idx; i++)
1295 sfree(last_poll_data[i]);
1297 sfree(last_poll_data);
1298 last_poll_data = NULL;
1300 DEBUG("finished ceph_shutdown");
1304 void module_register(void)
1306 plugin_register_complex_config("ceph", ceph_config);
1307 plugin_register_init("ceph", ceph_init);
1308 plugin_register_read("ceph", ceph_read);
1309 plugin_register_shutdown("ceph", ceph_shutdown);