**/
#include "collectd.h"
+#include "common.h"
#include "plugin.h"
#include <sys/types.h>
};
typedef struct ce_connection_info_s ce_connection_info_t;
+struct ce_callback_info_s
+{
+ int fd;
+ ETERM *fun;
+};
+typedef struct ce_callback_info_s ce_callback_info_t;
+
/*
* Private variables
*/
static pthread_t listen_thread_id;
static _Bool listen_thread_running = false;
+static const char *config_keys[] =
+{
+ "BindTo",
+ "BindPort",
+ "Cookie",
+ "NodeName"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static char conf_node[NI_MAXHOST] = "";
static char conf_service[NI_MAXSERV] = "29157";
static char conf_cookie[256] = "ceisaequ";
+static char conf_hostname[256] = "alyja";
+static char conf_nodename[256] = "collectd";
+static char conf_fullname[256] = "collectd@alyja.office.noris.de";
+
+static int connection_counter = 1;
+static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
/*
* Private functions
*/
+static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */
+{
+ if (ci == NULL)
+ return;
+
+ if (ci->fd >= 0)
+ {
+ erl_close_connection (ci->fd);
+ ci->fd = -1;
+ }
+
+ if (ci->fun != NULL)
+ erl_free_compound (ci->fun);
+
+ free (ci);
+} /* }}} void ce_free_callback_info */
+
static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
{
ETERM *reply;
else
status = 0;
- erl_free_term (reply);
+ erl_free_compound (reply);
return (status);
} /* }}} int send_error */
*ret_int = (int) (ERL_FLOAT_VALUE (term) + .5);
break;
+#ifdef ERL_LONGLONG
case ERL_LONGLONG:
*ret_int = (int) ERL_LL_VALUE (term);
break;
+#endif /* ERL_LONGLONG */
+#ifdef ERL_U_LONGLONG
case ERL_U_LONGLONG:
*ret_int = (int) ERL_LL_UVALUE (term);
break;
+#endif /* ERL_U_LONGLONG */
default:
ERROR ("erlang plugin: Don't know how to cast "
*ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5);
break;
+#ifdef ERL_LONGLONG
case ERL_LONGLONG:
*ret_time = (time_t) ERL_LL_VALUE (term);
break;
+#endif /* ERL_LONGLONG */
+#ifdef ERL_U_LONGLONG
case ERL_U_LONGLONG:
*ret_time = (time_t) ERL_LL_UVALUE (term);
break;
+#endif /* ERL_U_LONGLONG */
default:
ERROR ("erlang plugin: Don't know how to cast "
break;
}
+#ifdef ERL_LONGLONG
case ERL_LONGLONG:
{
long long v = ERL_LL_VALUE (term);
}
break;
}
+#endif /* ERL_LONGLONG */
+#ifdef ERL_U_LONGLONG
case ERL_U_LONGLONG:
{
unsigned long long v = ERL_LL_UVALUE (term);
}
break;
}
+#endif /* ERL_U_LONGLONG */
default:
ERROR ("erlang plugin: Don't know how to cast "
return (0);
} /* }}} int eterm_to_value_list */
+static int ce_read (user_data_t *ud) /* {{{ */
+{
+ ce_callback_info_t *ci;
+ ETERM *rpc_args;
+ ETERM *rpc_reply;
+
+ if ((ud == NULL) || (ud->data == NULL))
+ return (-1);
+
+ ci = ud->data;
+
+ rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun));
+ if (rpc_args == NULL)
+ {
+ ERROR ("erlang plugin: erl_format failed.");
+ return (-1);
+ }
+
+ DEBUG ("erlang plugin: Making remote procedure call ...");
+ rpc_reply = erl_rpc (ci->fd,
+ /* module = */ "erlang", /* function = */ "apply",
+ /* arguments = */ rpc_args);
+ DEBUG ("erlang plugin: ... done.");
+ erl_free_compound (rpc_args);
+ if (rpc_reply == NULL)
+ {
+ char errbuf[1024];
+ ERROR ("erlang plugin: erl_rpc failed: %s",
+ sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+ return (-1);
+ }
+
+ /* FIXME: The return value is not yet used. */
+ erl_free_compound (rpc_reply);
+
+ return (0);
+} /* }}} int ce_read */
+
/* Returns non-zero only if the request could not be handled gracefully. */
static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
const ErlMessage *req)
if (status != 0)
{
free (vl.values);
- send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
- return (0);
+ status = send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
+ return (status);
}
status = plugin_dispatch_values (&vl);
if (status != 0)
{
free (vl.values);
- send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
- return (0);
+ status = send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
+ return (status);
}
free (vl.values);
- send_atom (cinfo->fd, req->from, "success");
+ status = send_atom (cinfo->fd, req->from, "success");
- return (0);
+ return (status);
+} /* }}} int handle_dispatch_values */
+
+/* Returns non-zero only if the request could not be handled gracefully. */
+static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */
+ const ErlMessage *req)
+{
+ ETERM *eterm_cb;
+ ce_callback_info_t *ci;
+ user_data_t ud;
+ int status;
+ int connection_number;
+ char callback_name[64];
+
+ if ((cinfo == NULL) || (req == NULL))
+ return (EINVAL);
+
+ eterm_cb = erl_element (2, req->msg);
+
+ if (ERL_TYPE (eterm_cb) != ERL_FUNCTION)
+ {
+ erl_free_term (eterm_cb);
+ status = send_error (cinfo->fd, req->from,
+ "Argument to `register_read' must be a callback function.");
+ return (status);
+ }
+
+ ci = malloc (sizeof (ci));
+ if (ci == NULL)
+ {
+ erl_free_term (eterm_cb);
+ status = send_error (cinfo->fd, req->from, "malloc failed.");
+ return (status);
+ }
+
+ /* Lock around `erl_connect_init' and `erl_connect'. */
+ pthread_mutex_lock (&connection_lock);
+
+ connection_number = connection_counter;
+ connection_counter++;
+
+ /* Create a new `cnode' for each connection. Otherwise we cannot determine
+ * which RPC call a message belongs to. */
+ status = erl_connect_init (connection_number, conf_cookie,
+ /* creation = */ 0);
+ if (!status) /* Yes, it's this way around in this case ... {{{ */
+ {
+ char errbuf[1024];
+ pthread_mutex_unlock (&connection_lock);
+ ERROR ("erlang plugin: erl_connect_init failed: %s",
+ sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+ sfree (ci);
+ erl_free_term (eterm_cb);
+ status = send_error (cinfo->fd, req->from, "erl_connect failed.");
+ return (status);
+ } /* }}} */
+
+ ci->fd = erl_connect (cinfo->conn.nodename);
+ if (ci->fd < 0) /* {{{ */
+ {
+ char errbuf[1024];
+ pthread_mutex_unlock (&connection_lock);
+ ERROR ("erlang plugin: erl_connect(%s) failed: %s",
+ cinfo->conn.nodename,
+ sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+ sfree (ci);
+ erl_free_term (eterm_cb);
+ status = send_error (cinfo->fd, req->from, "erl_connect failed.");
+ return (status);
+ } /* }}} */
+
+ pthread_mutex_unlock (&connection_lock);
+
+ ci->fun = eterm_cb;
+
+ memset (&ud, 0, sizeof (ud));
+ ud.data = ci;
+ ud.free_func = (void (*) (void *)) ce_free_callback_info;
+
+ ssnprintf (callback_name, sizeof (callback_name), "erlang:%i",
+ connection_number);
+
+ status = plugin_register_complex_read (callback_name,
+ ce_read, /* interval = */ NULL, &ud);
+ if (status == 0)
+ status = send_atom (cinfo->fd, req->from, "success");
+ else
+ status = send_error (cinfo->fd, req->from,
+ "plugin_register_complex_read failed.");
+
+ return (status);
} /* }}} int handle_dispatch_values */
static void *handle_client_thread (void *arg) /* {{{ */
reply = NULL;
if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
status = handle_dispatch_values (cinfo, &emsg);
+ else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0)
+ status = handle_register_read (cinfo, &emsg);
else
{
ERROR ("erlang plugin: Received request for invalid function `%s'.",
/* Dunno if calling this multiple times is legal. Since it wants to have
* the sin_addr for some reason this is the best place to call this,
* though. -octo */
- status = erl_connect_xinit (/* host name = */ "leeloo",
- /* plain node name = */ "collectd",
- /* full node name = */ "collectd@leeloo.lan.home.verplant.org",
+ status = erl_connect_xinit (/* host name = */ conf_hostname,
+ /* plain node name = */ conf_nodename,
+ /* full node name = */ conf_fullname,
/* our address = */ sin_addr,
/* secret cookie = */ conf_cookie,
/* instance number = */ 0);
status = erl_publish (numeric_serv);
if (status < 0)
{
- ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
+ ERROR ("erlang plugin: erl_publish (%i) failed with status %i. "
+ "Is epmd running?", numeric_serv, status);
close (sock_descr);
sock_descr = -1;
return (-1);
}
}
+ if (sock_descr >= 0)
+ {
+ INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, "
+ "Cookie %s.",
+ conf_fullname, numeric_serv, conf_cookie);
+ }
+
return (sock_descr);
} /* }}} int create_listen_socket */
fd = erl_accept (listen, &conn);
if (fd < 0)
{
- ERROR ("erlang plugin: erl_accept failed with status %i.", fd);
- close (listen);
- exit (EXIT_FAILURE);
+ char errbuf[1024];
+ ERROR ("erlang plugin: erl_accept failed: %s",
+ sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+ continue;
}
DEBUG ("erlang plugin: Got connection from %s on fd %i.",
conn.nodename, fd);
return (0);
} /* }}} int ce_init */
+static int ce_config (const char *key, const char *value) /* {{{ */
+{
+ if (strcasecmp ("BindTo", key) == 0)
+ {
+ sstrncpy (conf_node, value, sizeof (conf_node));
+ }
+ else if (strcasecmp ("BindPort", key) == 0)
+ {
+ sstrncpy (conf_service, value, sizeof (conf_service));
+ }
+ else if (strcasecmp ("Cookie", key) == 0)
+ {
+ sstrncpy (conf_cookie, value, sizeof (conf_cookie));
+ }
+ else if (strcasecmp ("NodeName", key) == 0)
+ {
+ const char *host;
+
+ host = strchr (value, '@');
+ if (host == NULL)
+ {
+ sstrncpy (conf_nodename, value, sizeof (conf_nodename));
+ sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname));
+ ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s",
+ conf_nodename, conf_hostname);
+ }
+ else /* if (host != NULL) */
+ {
+ char *tmp;
+
+ sstrncpy (conf_nodename, value, sizeof (conf_nodename));
+ sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname));
+ sstrncpy (conf_fullname, value, sizeof (conf_fullname));
+
+ tmp = strchr (conf_nodename, '@');
+ if (tmp != NULL)
+ *tmp = 0;
+ }
+ }
+ else
+ {
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int ce_config */
+
void module_register (void)
{
+ plugin_register_config ("erlang", ce_config, config_keys, config_keys_num);
plugin_register_init ("erlang", ce_init);
}