X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Ferlang.c;h=96f77aee5ba4b7b0c5339c28721c961ac5be6a77;hb=43ea89b8f320b54cc6d1bb05c5cc13488bb438c9;hp=4b488b4edf5480dc7442c45210bc633baa987348;hpb=71f9dc1cf6d6aa8d5dd52e89588771bfe57c90a9;p=collectd.git diff --git a/src/erlang.c b/src/erlang.c index 4b488b4e..96f77aee 100644 --- a/src/erlang.c +++ b/src/erlang.c @@ -20,6 +20,7 @@ **/ #include "collectd.h" +#include "common.h" #include "plugin.h" #include @@ -42,18 +43,58 @@ struct ce_connection_info_s }; typedef struct ce_connection_info_s ce_connection_info_t; +struct ce_callback_info_s +{ + int fd; + ETERM *fun; +}; +typedef struct ce_callback_info_s ce_callback_info_t; + /* * Private variables */ static pthread_t listen_thread_id; static _Bool listen_thread_running = false; +static const char *config_keys[] = +{ + "BindTo", + "BindPort", + "Cookie", + "NodeName" +}; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); + +static char conf_node[NI_MAXHOST] = ""; static char conf_service[NI_MAXSERV] = "29157"; static char conf_cookie[256] = "ceisaequ"; +static char conf_hostname[256] = "alyja"; +static char conf_nodename[256] = "collectd"; +static char conf_fullname[256] = "collectd@alyja.office.noris.de"; + +static int connection_counter = 1; +static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER; /* * Private functions */ +static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */ +{ + if (ci == NULL) + return; + + if (ci->fd >= 0) + { + erl_close_connection (ci->fd); + ci->fd = -1; + } + + if (ci->fun != NULL) + erl_free_compound (ci->fun); + + free (ci); +} /* }}} void ce_free_callback_info */ + static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */ { ETERM *reply; @@ -86,38 +127,536 @@ static int send_error (int fd, ETERM *to, const char *message) /* {{{ */ else status = 0; - erl_free_term (reply); + erl_free_compound (reply); return (status); } /* }}} int send_error */ -/* Returns non-zero only if the request could not be handled gracefully. */ -static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ - const ErlMessage *req) +static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */ +{ + if ((term == NULL) || (ret_int == NULL)) + return (EINVAL); + + switch (ERL_TYPE (term)) + { + case ERL_INTEGER: + *ret_int = (int) ERL_INT_VALUE (term); + break; + + case ERL_U_INTEGER: + *ret_int = (int) ERL_INT_UVALUE (term); + break; + + case ERL_FLOAT: + *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5); + break; + +#ifdef ERL_LONGLONG + case ERL_LONGLONG: + *ret_int = (int) ERL_LL_VALUE (term); + break; +#endif /* ERL_LONGLONG */ + +#ifdef ERL_U_LONGLONG + case ERL_U_LONGLONG: + *ret_int = (int) ERL_LL_UVALUE (term); + break; +#endif /* ERL_U_LONGLONG */ + + default: + ERROR ("erlang plugin: Don't know how to cast " + "erlang type %#x to int.", (unsigned int) ERL_TYPE (term)); + return (ENOTSUP); + } /* switch (ERL_TYPE (term)) */ + + return (0); +} /* }}} int eterm_to_int */ + +static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */ { - ETERM *vl; + if ((term == NULL) || (ret_time == NULL)) + return (EINVAL); - vl = erl_element (2, req->msg); - if ((vl == NULL) || !ERL_IS_TUPLE (vl)) + if (ERL_IS_NIL (term) + || (ERL_IS_ATOM (term) + && ((strcmp ("now", ERL_ATOM_PTR (term)) == 0) + || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0)))) { - erl_free_term (vl); - send_error (cinfo->fd, req->from, "Invalid format: VL not a tubple."); + *ret_time = time (NULL); return (0); } - /* We need: Identifier (5 parts), time, interval, values - * => 8 parts */ - if (ERL_TUPLE_SIZE (vl) != 8) + switch (ERL_TYPE (term)) { - erl_free_term (vl); - send_error (cinfo->fd, req->from, "Invalid format: " - "VL needs eight components."); + case ERL_INTEGER: + *ret_time = (time_t) ERL_INT_VALUE (term); + break; + + case ERL_U_INTEGER: + *ret_time = (time_t) ERL_INT_UVALUE (term); + break; + + case ERL_ATOM: + if ((strcmp ("now", ERL_ATOM_PTR (term)) == 0) + || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0)) + { + *ret_time = time (NULL); + } + else + { + ERROR ("erlang plugin: Invalid atom for time: %s.", + ERL_ATOM_PTR (term)); + return (ENOTSUP); + } + break; + + case ERL_FLOAT: + *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5); + break; + +#ifdef ERL_LONGLONG + case ERL_LONGLONG: + *ret_time = (time_t) ERL_LL_VALUE (term); + break; +#endif /* ERL_LONGLONG */ + +#ifdef ERL_U_LONGLONG + case ERL_U_LONGLONG: + *ret_time = (time_t) ERL_LL_UVALUE (term); + break; +#endif /* ERL_U_LONGLONG */ + + default: + ERROR ("erlang plugin: Don't know how to cast " + "erlang type %#x to time_t.", (unsigned int) ERL_TYPE (term)); + return (ENOTSUP); + } /* switch (ERL_TYPE (term)) */ + + return (0); +} /* }}} int eterm_to_time_t */ + +static int eterm_to_string (const ETERM *term, char *buffer, size_t buffer_size) /* {{{ */ +{ + char *tmp; + + if ((term == NULL) || (buffer == NULL) || (buffer_size <= 0)) + return (EINVAL); + + memset (buffer, 0, buffer_size); + + if (ERL_IS_EMPTY_LIST (term) + || ERL_IS_NIL (term) + || (ERL_IS_ATOM (term) + && (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))) + { + buffer[0] = 0; return (0); } - send_atom (cinfo->fd, req->from, "success"); + if (!ERL_IS_LIST (term)) + return (-1); + + tmp = erl_iolist_to_string (term); + if (tmp == NULL) + return (-1); + + strncpy (buffer, tmp, buffer_size - 1); + erl_free (tmp); + + return (0); +} /* }}} int eterm_to_string */ + +static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */ + value_t *value) +{ + if ((term == NULL) || (value == NULL)) + return (EINVAL); + + switch (ERL_TYPE (term)) + { + case ERL_INTEGER: + { + int v = ERL_INT_VALUE (term); + switch (ds_type) + { + case DS_TYPE_COUNTER: value->counter = (counter_t) v; break; + case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break; + case DS_TYPE_DERIVE: value->derive = (derive_t) v; break; + case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break; + } + break; + } + + case ERL_U_INTEGER: + { + unsigned int v = ERL_INT_UVALUE (term); + switch (ds_type) + { + case DS_TYPE_COUNTER: value->counter = (counter_t) v; break; + case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break; + case DS_TYPE_DERIVE: value->derive = (derive_t) v; break; + case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break; + } + break; + } + + case ERL_FLOAT: + { + double v = ERL_FLOAT_VALUE (term); + switch (ds_type) + { + case DS_TYPE_COUNTER: value->counter = (counter_t) v; break; + case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break; + case DS_TYPE_DERIVE: value->derive = (derive_t) v; break; + case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break; + } + break; + } + +#ifdef ERL_LONGLONG + case ERL_LONGLONG: + { + long long v = ERL_LL_VALUE (term); + switch (ds_type) + { + case DS_TYPE_COUNTER: value->counter = (counter_t) v; break; + case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break; + case DS_TYPE_DERIVE: value->derive = (derive_t) v; break; + case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break; + } + break; + } +#endif /* ERL_LONGLONG */ + +#ifdef ERL_U_LONGLONG + case ERL_U_LONGLONG: + { + unsigned long long v = ERL_LL_UVALUE (term); + switch (ds_type) + { + case DS_TYPE_COUNTER: value->counter = (counter_t) v; break; + case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break; + case DS_TYPE_DERIVE: value->derive = (derive_t) v; break; + case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break; + } + break; + } +#endif /* ERL_U_LONGLONG */ + + default: + ERROR ("erlang plugin: Don't know how to cast " + "erlang type %#x to value_t.", (unsigned int) ERL_TYPE (term)); + return (ENOTSUP); + } /* switch (ERL_TYPE (term)) */ + + return (0); +} /* }}} int eterm_to_value */ + +static int eterm_to_values (const ETERM *term, const data_set_t *ds, /* {{{ */ + value_list_t *vl) +{ + int ds_index; + int status; + + if ((term == NULL) || (ds == NULL) || (vl == NULL)) + return (EINVAL); + + if (!ERL_IS_LIST (term)) + return (-1); + + free (vl->values); + vl->values = NULL; + vl->values_len = 0; + + while (!ERL_IS_EMPTY_LIST (term)) + { + const ETERM *eterm_value; + value_t *tmp; + + if (ds_index >= ds->ds_num) + { + ds_index = ds->ds_num + 1; + status = 0; + break; + } + + tmp = realloc (vl->values, sizeof (*tmp) * (vl->values_len + 1)); + if (tmp == NULL) + { + status = ENOMEM; + break; + } + vl->values = tmp; + + eterm_value = ERL_CONS_HEAD (term); + term = ERL_CONS_TAIL (term); + + status = eterm_to_value (eterm_value, ds->ds[ds_index].type, + vl->values + vl->values_len); + if (status != 0) + break; + + vl->values_len++; + ds_index++; + } + + if ((status == 0) && (ds_index != ds->ds_num)) + NOTICE ("erlang plugin: Incorrect number of values received for type %s: " + "Expected %i, got %i.", ds->type, ds->ds_num, ds_index); + + if ((status != 0) || (ds_index != ds->ds_num)) + { + free (vl->values); + vl->values = NULL; + vl->values_len = 0; + return (status); + } + + return (0); +} /* }}} int eterm_to_values */ + +static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */ +{ + ETERM *tmp; + int status; + const data_set_t *ds; + + if ((term == NULL) || (vl == NULL)) + return (EINVAL); + + if (!ERL_IS_TUPLE (term) || (ERL_TUPLE_SIZE (term) != 9)) + return (EINVAL); + + tmp = erl_element (1, term); + if (!ERL_IS_ATOM (tmp) + || (strcmp ("value_list", ERL_ATOM_PTR (tmp)) != 0)) + { + erl_free_term (tmp); + return (-1); + } + erl_free_term (tmp); + + status = 0; + do + { +#define TUPLE_ELEM_TO_CHAR_ARRAY(idx,buf) \ + tmp = erl_element ((idx), term); \ + status = eterm_to_string (tmp, (buf), sizeof (buf)); \ + erl_free_term (tmp); \ + if (status != 0) \ + break; + + TUPLE_ELEM_TO_CHAR_ARRAY (2, vl->host); + TUPLE_ELEM_TO_CHAR_ARRAY (3, vl->plugin); + TUPLE_ELEM_TO_CHAR_ARRAY (4, vl->plugin_instance); + TUPLE_ELEM_TO_CHAR_ARRAY (5, vl->type); + TUPLE_ELEM_TO_CHAR_ARRAY (6, vl->type_instance); + + ds = plugin_get_ds (vl->type); + if (ds == NULL) + { + status = -1; + break; + } + + tmp = erl_element (7, term); + status = eterm_to_time_t (tmp, &vl->time); + erl_free_term (tmp); + if (status != 0) + break; + + tmp = erl_element (8, term); + status = eterm_to_int (tmp, &vl->interval); + erl_free_term (tmp); + if (status != 0) + break; + if (vl->interval < 1) + vl->interval = interval_g; + + tmp = erl_element (9, term); + status = eterm_to_values (tmp, ds, vl); + erl_free_term (tmp); + if (status != 0) + break; + +#undef TUPLE_ELEM_TO_CHAR_ARRAY + } while (0); + + if (status != 0) + return (status); + + /* validate the struct */ + if ((vl->host[0] == 0) || (vl->plugin[0] == 0) || (vl->type[0] == 0)) + return (-1); + + if (ds->ds_num != vl->values_len) + return (-1); + + return (0); +} /* }}} int eterm_to_value_list */ + +static int ce_read (user_data_t *ud) /* {{{ */ +{ + ce_callback_info_t *ci; + ETERM *rpc_args; + ETERM *rpc_reply; + + if ((ud == NULL) || (ud->data == NULL)) + return (-1); + + ci = ud->data; + + rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun)); + if (rpc_args == NULL) + { + ERROR ("erlang plugin: erl_format failed."); + return (-1); + } + + DEBUG ("erlang plugin: Making remote procedure call ..."); + rpc_reply = erl_rpc (ci->fd, + /* module = */ "erlang", /* function = */ "apply", + /* arguments = */ rpc_args); + DEBUG ("erlang plugin: ... done."); + erl_free_compound (rpc_args); + if (rpc_reply == NULL) + { + char errbuf[1024]; + ERROR ("erlang plugin: erl_rpc failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + return (-1); + } + + /* FIXME: The return value is not yet used. */ + erl_free_compound (rpc_reply); return (0); +} /* }}} int ce_read */ + +/* Returns non-zero only if the request could not be handled gracefully. */ +static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ + const ErlMessage *req) +{ + ETERM *eterm_vl; + value_list_t vl; + int status; + + memset (&vl, 0, sizeof (vl)); + vl.values = NULL; + + eterm_vl = erl_element (2, req->msg); + status = eterm_to_value_list (eterm_vl, &vl); + erl_free_term (eterm_vl); + + if (status != 0) + { + free (vl.values); + status = send_error (cinfo->fd, req->from, "Cannot parse argument as value list."); + return (status); + } + + status = plugin_dispatch_values (&vl); + if (status != 0) + { + free (vl.values); + status = send_error (cinfo->fd, req->from, "plugin_dispatch_values failed."); + return (status); + } + + free (vl.values); + status = send_atom (cinfo->fd, req->from, "success"); + + return (status); +} /* }}} int handle_dispatch_values */ + +/* Returns non-zero only if the request could not be handled gracefully. */ +static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */ + const ErlMessage *req) +{ + ETERM *eterm_cb; + ce_callback_info_t *ci; + user_data_t ud; + int status; + int connection_number; + char callback_name[64]; + + if ((cinfo == NULL) || (req == NULL)) + return (EINVAL); + + eterm_cb = erl_element (2, req->msg); + + if (ERL_TYPE (eterm_cb) != ERL_FUNCTION) + { + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, + "Argument to `register_read' must be a callback function."); + return (status); + } + + ci = malloc (sizeof (ci)); + if (ci == NULL) + { + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "malloc failed."); + return (status); + } + + /* Lock around `erl_connect_init' and `erl_connect'. */ + pthread_mutex_lock (&connection_lock); + + connection_number = connection_counter; + connection_counter++; + + /* Create a new `cnode' for each connection. Otherwise we cannot determine + * which RPC call a message belongs to. */ + status = erl_connect_init (connection_number, conf_cookie, + /* creation = */ 0); + if (!status) /* Yes, it's this way around in this case ... {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect_init failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + ci->fd = erl_connect (cinfo->conn.nodename); + if (ci->fd < 0) /* {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect(%s) failed: %s", + cinfo->conn.nodename, + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + pthread_mutex_unlock (&connection_lock); + + ci->fun = eterm_cb; + + memset (&ud, 0, sizeof (ud)); + ud.data = ci; + ud.free_func = (void (*) (void *)) ce_free_callback_info; + + ssnprintf (callback_name, sizeof (callback_name), "erlang:%i", + connection_number); + + status = plugin_register_complex_read (callback_name, + ce_read, /* interval = */ NULL, &ud); + if (status == 0) + status = send_atom (cinfo->fd, req->from, "success"); + else + status = send_error (cinfo->fd, req->from, + "plugin_register_complex_read failed."); + + return (status); } /* }}} int handle_dispatch_values */ static void *handle_client_thread (void *arg) /* {{{ */ @@ -178,6 +717,8 @@ static void *handle_client_thread (void *arg) /* {{{ */ reply = NULL; if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0) status = handle_dispatch_values (cinfo, &emsg); + else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0) + status = handle_register_read (cinfo, &emsg); else { ERROR ("erlang plugin: Received request for invalid function `%s'.", @@ -269,9 +810,9 @@ static int create_listen_socket (void) /* {{{ */ /* Dunno if calling this multiple times is legal. Since it wants to have * the sin_addr for some reason this is the best place to call this, * though. -octo */ - status = erl_connect_xinit (/* host name = */ "leeloo", - /* plain node name = */ "collectd", - /* full node name = */ "collectd@leeloo.lan.home.verplant.org", + status = erl_connect_xinit (/* host name = */ conf_hostname, + /* plain node name = */ conf_nodename, + /* full node name = */ conf_fullname, /* our address = */ sin_addr, /* secret cookie = */ conf_cookie, /* instance number = */ 0); @@ -328,13 +869,21 @@ static int create_listen_socket (void) /* {{{ */ status = erl_publish (numeric_serv); if (status < 0) { - ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status); + ERROR ("erlang plugin: erl_publish (%i) failed with status %i. " + "Is epmd running?", numeric_serv, status); close (sock_descr); sock_descr = -1; return (-1); } } + if (sock_descr >= 0) + { + INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, " + "Cookie %s.", + conf_fullname, numeric_serv, conf_cookie); + } + return (sock_descr); } /* }}} int create_listen_socket */ @@ -362,9 +911,10 @@ void *listen_thread (void *arg) /* {{{ */ fd = erl_accept (listen, &conn); if (fd < 0) { - ERROR ("erlang plugin: erl_accept failed with status %i.", fd); - close (listen); - exit (EXIT_FAILURE); + char errbuf[1024]; + ERROR ("erlang plugin: erl_accept failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + continue; } DEBUG ("erlang plugin: Got connection from %s on fd %i.", conn.nodename, fd); @@ -408,8 +958,56 @@ static int ce_init (void) /* {{{ */ return (0); } /* }}} int ce_init */ +static int ce_config (const char *key, const char *value) /* {{{ */ +{ + if (strcasecmp ("BindTo", key) == 0) + { + sstrncpy (conf_node, value, sizeof (conf_node)); + } + else if (strcasecmp ("BindPort", key) == 0) + { + sstrncpy (conf_service, value, sizeof (conf_service)); + } + else if (strcasecmp ("Cookie", key) == 0) + { + sstrncpy (conf_cookie, value, sizeof (conf_cookie)); + } + else if (strcasecmp ("NodeName", key) == 0) + { + const char *host; + + host = strchr (value, '@'); + if (host == NULL) + { + sstrncpy (conf_nodename, value, sizeof (conf_nodename)); + sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname)); + ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s", + conf_nodename, conf_hostname); + } + else /* if (host != NULL) */ + { + char *tmp; + + sstrncpy (conf_nodename, value, sizeof (conf_nodename)); + sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname)); + sstrncpy (conf_fullname, value, sizeof (conf_fullname)); + + tmp = strchr (conf_nodename, '@'); + if (tmp != NULL) + *tmp = 0; + } + } + else + { + return (-1); + } + + return (0); +} /* }}} int ce_config */ + void module_register (void) { + plugin_register_config ("erlang", ce_config, config_keys, config_keys_num); plugin_register_init ("erlang", ce_init); }