X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Ferlang.c;h=96f77aee5ba4b7b0c5339c28721c961ac5be6a77;hb=43ea89b8f320b54cc6d1bb05c5cc13488bb438c9;hp=8317d82cda8edbec7ac4abbc6d49fd40f12ee1a6;hpb=a53e5184f3f6f801e46f1f0a0d4b0d0a566657f2;p=collectd.git diff --git a/src/erlang.c b/src/erlang.c index 8317d82c..96f77aee 100644 --- a/src/erlang.c +++ b/src/erlang.c @@ -20,6 +20,7 @@ **/ #include "collectd.h" +#include "common.h" #include "plugin.h" #include @@ -42,18 +43,58 @@ struct ce_connection_info_s }; typedef struct ce_connection_info_s ce_connection_info_t; +struct ce_callback_info_s +{ + int fd; + ETERM *fun; +}; +typedef struct ce_callback_info_s ce_callback_info_t; + /* * Private variables */ static pthread_t listen_thread_id; static _Bool listen_thread_running = false; +static const char *config_keys[] = +{ + "BindTo", + "BindPort", + "Cookie", + "NodeName" +}; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); + +static char conf_node[NI_MAXHOST] = ""; static char conf_service[NI_MAXSERV] = "29157"; static char conf_cookie[256] = "ceisaequ"; +static char conf_hostname[256] = "alyja"; +static char conf_nodename[256] = "collectd"; +static char conf_fullname[256] = "collectd@alyja.office.noris.de"; + +static int connection_counter = 1; +static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER; /* * Private functions */ +static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */ +{ + if (ci == NULL) + return; + + if (ci->fd >= 0) + { + erl_close_connection (ci->fd); + ci->fd = -1; + } + + if (ci->fun != NULL) + erl_free_compound (ci->fun); + + free (ci); +} /* }}} void ce_free_callback_info */ + static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */ { ETERM *reply; @@ -86,7 +127,7 @@ static int send_error (int fd, ETERM *to, const char *message) /* {{{ */ else status = 0; - erl_free_term (reply); + erl_free_compound (reply); return (status); } /* }}} int send_error */ @@ -110,13 +151,17 @@ static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */ *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5); break; +#ifdef ERL_LONGLONG case ERL_LONGLONG: *ret_int = (int) ERL_LL_VALUE (term); break; +#endif /* ERL_LONGLONG */ +#ifdef ERL_U_LONGLONG case ERL_U_LONGLONG: *ret_int = (int) ERL_LL_UVALUE (term); break; +#endif /* ERL_U_LONGLONG */ default: ERROR ("erlang plugin: Don't know how to cast " @@ -169,13 +214,17 @@ static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */ *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5); break; +#ifdef ERL_LONGLONG case ERL_LONGLONG: *ret_time = (time_t) ERL_LL_VALUE (term); break; +#endif /* ERL_LONGLONG */ +#ifdef ERL_U_LONGLONG case ERL_U_LONGLONG: *ret_time = (time_t) ERL_LL_UVALUE (term); break; +#endif /* ERL_U_LONGLONG */ default: ERROR ("erlang plugin: Don't know how to cast " @@ -264,6 +313,7 @@ static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */ break; } +#ifdef ERL_LONGLONG case ERL_LONGLONG: { long long v = ERL_LL_VALUE (term); @@ -276,7 +326,9 @@ static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */ } break; } +#endif /* ERL_LONGLONG */ +#ifdef ERL_U_LONGLONG case ERL_U_LONGLONG: { unsigned long long v = ERL_LL_UVALUE (term); @@ -289,6 +341,7 @@ static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */ } break; } +#endif /* ERL_U_LONGLONG */ default: ERROR ("erlang plugin: Don't know how to cast " @@ -442,6 +495,44 @@ static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */ return (0); } /* }}} int eterm_to_value_list */ +static int ce_read (user_data_t *ud) /* {{{ */ +{ + ce_callback_info_t *ci; + ETERM *rpc_args; + ETERM *rpc_reply; + + if ((ud == NULL) || (ud->data == NULL)) + return (-1); + + ci = ud->data; + + rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun)); + if (rpc_args == NULL) + { + ERROR ("erlang plugin: erl_format failed."); + return (-1); + } + + DEBUG ("erlang plugin: Making remote procedure call ..."); + rpc_reply = erl_rpc (ci->fd, + /* module = */ "erlang", /* function = */ "apply", + /* arguments = */ rpc_args); + DEBUG ("erlang plugin: ... done."); + erl_free_compound (rpc_args); + if (rpc_reply == NULL) + { + char errbuf[1024]; + ERROR ("erlang plugin: erl_rpc failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + return (-1); + } + + /* FIXME: The return value is not yet used. */ + erl_free_compound (rpc_reply); + + return (0); +} /* }}} int ce_read */ + /* Returns non-zero only if the request could not be handled gracefully. */ static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ const ErlMessage *req) @@ -460,22 +551,112 @@ static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ if (status != 0) { free (vl.values); - send_error (cinfo->fd, req->from, "Cannot parse argument as value list."); - return (0); + status = send_error (cinfo->fd, req->from, "Cannot parse argument as value list."); + return (status); } status = plugin_dispatch_values (&vl); if (status != 0) { free (vl.values); - send_error (cinfo->fd, req->from, "plugin_dispatch_values failed."); - return (0); + status = send_error (cinfo->fd, req->from, "plugin_dispatch_values failed."); + return (status); } free (vl.values); - send_atom (cinfo->fd, req->from, "success"); + status = send_atom (cinfo->fd, req->from, "success"); - return (0); + return (status); +} /* }}} int handle_dispatch_values */ + +/* Returns non-zero only if the request could not be handled gracefully. */ +static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */ + const ErlMessage *req) +{ + ETERM *eterm_cb; + ce_callback_info_t *ci; + user_data_t ud; + int status; + int connection_number; + char callback_name[64]; + + if ((cinfo == NULL) || (req == NULL)) + return (EINVAL); + + eterm_cb = erl_element (2, req->msg); + + if (ERL_TYPE (eterm_cb) != ERL_FUNCTION) + { + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, + "Argument to `register_read' must be a callback function."); + return (status); + } + + ci = malloc (sizeof (ci)); + if (ci == NULL) + { + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "malloc failed."); + return (status); + } + + /* Lock around `erl_connect_init' and `erl_connect'. */ + pthread_mutex_lock (&connection_lock); + + connection_number = connection_counter; + connection_counter++; + + /* Create a new `cnode' for each connection. Otherwise we cannot determine + * which RPC call a message belongs to. */ + status = erl_connect_init (connection_number, conf_cookie, + /* creation = */ 0); + if (!status) /* Yes, it's this way around in this case ... {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect_init failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + ci->fd = erl_connect (cinfo->conn.nodename); + if (ci->fd < 0) /* {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect(%s) failed: %s", + cinfo->conn.nodename, + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + pthread_mutex_unlock (&connection_lock); + + ci->fun = eterm_cb; + + memset (&ud, 0, sizeof (ud)); + ud.data = ci; + ud.free_func = (void (*) (void *)) ce_free_callback_info; + + ssnprintf (callback_name, sizeof (callback_name), "erlang:%i", + connection_number); + + status = plugin_register_complex_read (callback_name, + ce_read, /* interval = */ NULL, &ud); + if (status == 0) + status = send_atom (cinfo->fd, req->from, "success"); + else + status = send_error (cinfo->fd, req->from, + "plugin_register_complex_read failed."); + + return (status); } /* }}} int handle_dispatch_values */ static void *handle_client_thread (void *arg) /* {{{ */ @@ -536,6 +717,8 @@ static void *handle_client_thread (void *arg) /* {{{ */ reply = NULL; if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0) status = handle_dispatch_values (cinfo, &emsg); + else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0) + status = handle_register_read (cinfo, &emsg); else { ERROR ("erlang plugin: Received request for invalid function `%s'.", @@ -627,9 +810,9 @@ static int create_listen_socket (void) /* {{{ */ /* Dunno if calling this multiple times is legal. Since it wants to have * the sin_addr for some reason this is the best place to call this, * though. -octo */ - status = erl_connect_xinit (/* host name = */ "leeloo", - /* plain node name = */ "collectd", - /* full node name = */ "collectd@leeloo.lan.home.verplant.org", + status = erl_connect_xinit (/* host name = */ conf_hostname, + /* plain node name = */ conf_nodename, + /* full node name = */ conf_fullname, /* our address = */ sin_addr, /* secret cookie = */ conf_cookie, /* instance number = */ 0); @@ -686,13 +869,21 @@ static int create_listen_socket (void) /* {{{ */ status = erl_publish (numeric_serv); if (status < 0) { - ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status); + ERROR ("erlang plugin: erl_publish (%i) failed with status %i. " + "Is epmd running?", numeric_serv, status); close (sock_descr); sock_descr = -1; return (-1); } } + if (sock_descr >= 0) + { + INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, " + "Cookie %s.", + conf_fullname, numeric_serv, conf_cookie); + } + return (sock_descr); } /* }}} int create_listen_socket */ @@ -720,9 +911,10 @@ void *listen_thread (void *arg) /* {{{ */ fd = erl_accept (listen, &conn); if (fd < 0) { - ERROR ("erlang plugin: erl_accept failed with status %i.", fd); - close (listen); - exit (EXIT_FAILURE); + char errbuf[1024]; + ERROR ("erlang plugin: erl_accept failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + continue; } DEBUG ("erlang plugin: Got connection from %s on fd %i.", conn.nodename, fd); @@ -766,8 +958,56 @@ static int ce_init (void) /* {{{ */ return (0); } /* }}} int ce_init */ +static int ce_config (const char *key, const char *value) /* {{{ */ +{ + if (strcasecmp ("BindTo", key) == 0) + { + sstrncpy (conf_node, value, sizeof (conf_node)); + } + else if (strcasecmp ("BindPort", key) == 0) + { + sstrncpy (conf_service, value, sizeof (conf_service)); + } + else if (strcasecmp ("Cookie", key) == 0) + { + sstrncpy (conf_cookie, value, sizeof (conf_cookie)); + } + else if (strcasecmp ("NodeName", key) == 0) + { + const char *host; + + host = strchr (value, '@'); + if (host == NULL) + { + sstrncpy (conf_nodename, value, sizeof (conf_nodename)); + sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname)); + ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s", + conf_nodename, conf_hostname); + } + else /* if (host != NULL) */ + { + char *tmp; + + sstrncpy (conf_nodename, value, sizeof (conf_nodename)); + sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname)); + sstrncpy (conf_fullname, value, sizeof (conf_fullname)); + + tmp = strchr (conf_nodename, '@'); + if (tmp != NULL) + *tmp = 0; + } + } + else + { + return (-1); + } + + return (0); +} /* }}} int ce_config */ + void module_register (void) { + plugin_register_config ("erlang", ce_config, config_keys, config_keys_num); plugin_register_init ("erlang", ce_init); }