X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Ferlang.c;h=9728614098d0d416473e0b829a9c93ff69378ec6;hb=a4e4fad8a90a099257e1a26ffae0ef8cdf4f6704;hp=0389136a05e802b99867c68b179cccd6abdf67c3;hpb=28464f0c7553e221bf299967357dd36299ef735d;p=collectd.git diff --git a/src/erlang.c b/src/erlang.c index 0389136a..97286140 100644 --- a/src/erlang.c +++ b/src/erlang.c @@ -20,6 +20,7 @@ **/ #include "collectd.h" +#include "common.h" #include "plugin.h" #include @@ -42,21 +43,58 @@ struct ce_connection_info_s }; typedef struct ce_connection_info_s ce_connection_info_t; +struct ce_callback_info_s +{ + int fd; + ETERM *fun; +}; +typedef struct ce_callback_info_s ce_callback_info_t; + /* * Private variables */ static pthread_t listen_thread_id; static _Bool listen_thread_running = false; +static const char *config_keys[] = +{ + "BindTo", + "BindPort", + "Cookie", + "NodeName" +}; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); + +static char conf_node[NI_MAXHOST] = ""; static char conf_service[NI_MAXSERV] = "29157"; static char conf_cookie[256] = "ceisaequ"; static char conf_hostname[256] = "alyja"; static char conf_nodename[256] = "collectd"; static char conf_fullname[256] = "collectd@alyja.office.noris.de"; +static int connection_counter = 1; +static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER; + /* * Private functions */ +static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */ +{ + if (ci == NULL) + return; + + if (ci->fd >= 0) + { + erl_close_connection (ci->fd); + ci->fd = -1; + } + + if (ci->fun != NULL) + erl_free_compound (ci->fun); + + free (ci); +} /* }}} void ce_free_callback_info */ + static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */ { ETERM *reply; @@ -457,6 +495,44 @@ static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */ return (0); } /* }}} int eterm_to_value_list */ +static int ce_read (user_data_t *ud) /* {{{ */ +{ + ce_callback_info_t *ci; + ETERM *rpc_args; + ETERM *rpc_reply; + + if ((ud == NULL) || (ud->data == NULL)) + return (-1); + + ci = ud->data; + + rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun)); + if (rpc_args == NULL) + { + ERROR ("erlang plugin: erl_format failed."); + return (-1); + } + + DEBUG ("erlang plugin: Making remote procedure call ..."); + rpc_reply = erl_rpc (ci->fd, + /* module = */ "erlang", /* function = */ "apply", + /* arguments = */ rpc_args); + DEBUG ("erlang plugin: ... done."); + erl_free_compound (rpc_args); + if (rpc_reply == NULL) + { + char errbuf[1024]; + ERROR ("erlang plugin: erl_rpc failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + return (-1); + } + + /* FIXME: The return value is not yet used. */ + erl_free_compound (rpc_reply); + + return (0); +} /* }}} int ce_read */ + /* Returns non-zero only if the request could not be handled gracefully. */ static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ const ErlMessage *req) @@ -493,6 +569,96 @@ static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ return (0); } /* }}} int handle_dispatch_values */ +/* Returns non-zero only if the request could not be handled gracefully. */ +static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */ + const ErlMessage *req) +{ + ETERM *eterm_cb; + ce_callback_info_t *ci; + user_data_t ud; + int status; + int connection_number; + char callback_name[64]; + + if ((cinfo == NULL) || (req == NULL)) + return (EINVAL); + + eterm_cb = erl_element (2, req->msg); + + if (ERL_TYPE (eterm_cb) != ERL_FUNCTION) + { + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, + "Argument to `register_read' must be a callback function."); + return (status); + } + + ci = malloc (sizeof (ci)); + if (ci == NULL) + { + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "malloc failed."); + return (status); + } + + /* Lock around `erl_connect_init' and `erl_connect'. */ + pthread_mutex_lock (&connection_lock); + + connection_number = connection_counter; + connection_counter++; + + /* Create a new `cnode' for each connection. Otherwise we cannot determine + * which RPC call a message belongs to. */ + status = erl_connect_init (connection_number, conf_cookie, + /* creation = */ 0); + if (!status) /* Yes, it's this way around in this case ... {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect_init failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + ci->fd = erl_connect (cinfo->conn.nodename); + if (ci->fd < 0) /* {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect(%s) failed: %s", + cinfo->conn.nodename, + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + pthread_mutex_unlock (&connection_lock); + + ci->fun = eterm_cb; + + memset (&ud, 0, sizeof (ud)); + ud.data = ci; + ud.free_func = (void (*) (void *)) ce_free_callback_info; + + ssnprintf (callback_name, sizeof (callback_name), "erlang:%i", + connection_number); + + status = plugin_register_complex_read (callback_name, + ce_read, /* interval = */ NULL, &ud); + if (status == 0) + status = send_atom (cinfo->fd, req->from, "success"); + else + status = send_error (cinfo->fd, req->from, + "plugin_register_complex_read failed."); + + return (status); +} /* }}} int handle_dispatch_values */ + static void *handle_client_thread (void *arg) /* {{{ */ { ce_connection_info_t *cinfo; @@ -551,6 +717,8 @@ static void *handle_client_thread (void *arg) /* {{{ */ reply = NULL; if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0) status = handle_dispatch_values (cinfo, &emsg); + else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0) + status = handle_register_read (cinfo, &emsg); else { ERROR ("erlang plugin: Received request for invalid function `%s'.", @@ -701,13 +869,21 @@ static int create_listen_socket (void) /* {{{ */ status = erl_publish (numeric_serv); if (status < 0) { - ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status); + ERROR ("erlang plugin: erl_publish (%i) failed with status %i. " + "Is epmd running?", numeric_serv, status); close (sock_descr); sock_descr = -1; return (-1); } } + if (sock_descr >= 0) + { + INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, " + "Cookie %s.", + conf_fullname, numeric_serv, conf_cookie); + } + return (sock_descr); } /* }}} int create_listen_socket */ @@ -781,8 +957,56 @@ static int ce_init (void) /* {{{ */ return (0); } /* }}} int ce_init */ +static int ce_config (const char *key, const char *value) /* {{{ */ +{ + if (strcasecmp ("BindTo", key) == 0) + { + sstrncpy (conf_node, value, sizeof (conf_node)); + } + else if (strcasecmp ("BindPort", key) == 0) + { + sstrncpy (conf_service, value, sizeof (conf_service)); + } + else if (strcasecmp ("Cookie", key) == 0) + { + sstrncpy (conf_cookie, value, sizeof (conf_cookie)); + } + else if (strcasecmp ("NodeName", key) == 0) + { + const char *host; + + host = strchr (value, '@'); + if (host == NULL) + { + sstrncpy (conf_nodename, value, sizeof (conf_nodename)); + sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname)); + ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s", + conf_nodename, conf_hostname); + } + else /* if (host != NULL) */ + { + char *tmp; + + sstrncpy (conf_nodename, value, sizeof (conf_nodename)); + sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname)); + sstrncpy (conf_fullname, value, sizeof (conf_fullname)); + + tmp = strchr (conf_nodename, '@'); + if (tmp != NULL) + *tmp = 0; + } + } + else + { + return (-1); + } + + return (0); +} /* }}} int ce_config */ + void module_register (void) { + plugin_register_config ("erlang", ce_config, config_keys, config_keys_num); plugin_register_init ("erlang", ce_init); }