From a4e4fad8a90a099257e1a26ffae0ef8cdf4f6704 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sat, 14 Nov 2009 10:13:39 +0100 Subject: [PATCH] erlang plugin: Actually register a read callback. --- src/erlang.c | 154 +++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 130 insertions(+), 24 deletions(-) diff --git a/src/erlang.c b/src/erlang.c index 3da9e0f4..97286140 100644 --- a/src/erlang.c +++ b/src/erlang.c @@ -43,6 +43,13 @@ struct ce_connection_info_s }; typedef struct ce_connection_info_s ce_connection_info_t; +struct ce_callback_info_s +{ + int fd; + ETERM *fun; +}; +typedef struct ce_callback_info_s ce_callback_info_t; + /* * Private variables */ @@ -65,9 +72,29 @@ static char conf_hostname[256] = "alyja"; static char conf_nodename[256] = "collectd"; static char conf_fullname[256] = "collectd@alyja.office.noris.de"; +static int connection_counter = 1; +static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER; + /* * Private functions */ +static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */ +{ + if (ci == NULL) + return; + + if (ci->fd >= 0) + { + erl_close_connection (ci->fd); + ci->fd = -1; + } + + if (ci->fun != NULL) + erl_free_compound (ci->fun); + + free (ci); +} /* }}} void ce_free_callback_info */ + static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */ { ETERM *reply; @@ -468,6 +495,44 @@ static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */ return (0); } /* }}} int eterm_to_value_list */ +static int ce_read (user_data_t *ud) /* {{{ */ +{ + ce_callback_info_t *ci; + ETERM *rpc_args; + ETERM *rpc_reply; + + if ((ud == NULL) || (ud->data == NULL)) + return (-1); + + ci = ud->data; + + rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun)); + if (rpc_args == NULL) + { + ERROR ("erlang plugin: erl_format failed."); + return (-1); + } + + DEBUG ("erlang plugin: Making remote procedure call ..."); + rpc_reply = erl_rpc (ci->fd, + /* module = */ "erlang", /* function = */ "apply", + /* arguments = */ rpc_args); + DEBUG ("erlang plugin: ... done."); + erl_free_compound (rpc_args); + if (rpc_reply == NULL) + { + char errbuf[1024]; + ERROR ("erlang plugin: erl_rpc failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + return (-1); + } + + /* FIXME: The return value is not yet used. */ + erl_free_compound (rpc_reply); + + return (0); +} /* }}} int ce_read */ + /* Returns non-zero only if the request could not be handled gracefully. */ static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ const ErlMessage *req) @@ -509,8 +574,11 @@ static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */ const ErlMessage *req) { ETERM *eterm_cb; - ETERM *rpc_args; - ETERM *rpc_reply; + ce_callback_info_t *ci; + user_data_t ud; + int status; + int connection_number; + char callback_name[64]; if ((cinfo == NULL) || (req == NULL)) return (EINVAL); @@ -520,37 +588,75 @@ static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */ if (ERL_TYPE (eterm_cb) != ERL_FUNCTION) { erl_free_term (eterm_cb); - send_error (cinfo->fd, req->from, + status = send_error (cinfo->fd, req->from, "Argument to `register_read' must be a callback function."); - return (0); + return (status); } - send_atom (cinfo->fd, req->from, "success"); - - /* FIXME: The following demonstrates how to call this function. This should - * be moved somewhere else, of course. */ - - /* --- 8< --- */ - rpc_args = erl_format ("[~w,[]]", eterm_cb); - if (rpc_args == NULL) + ci = malloc (sizeof (ci)); + if (ci == NULL) { erl_free_term (eterm_cb); - send_error (cinfo->fd, req->from, - "erl_format failed. Sorry."); - return (0); + status = send_error (cinfo->fd, req->from, "malloc failed."); + return (status); } - rpc_reply = erl_rpc (cinfo->fd, - /* module = */ "erlang", /* function = */ "apply", - /* arguments = */ rpc_args); + /* Lock around `erl_connect_init' and `erl_connect'. */ + pthread_mutex_lock (&connection_lock); - erl_free_compound (rpc_args); - /* Right now, the return value is not used. */ - erl_free_compound (rpc_reply); - /* --- >8 --- */ + connection_number = connection_counter; + connection_counter++; - erl_free_term (eterm_cb); /* XXX: This must stay here. */ - return (0); + /* Create a new `cnode' for each connection. Otherwise we cannot determine + * which RPC call a message belongs to. */ + status = erl_connect_init (connection_number, conf_cookie, + /* creation = */ 0); + if (!status) /* Yes, it's this way around in this case ... {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect_init failed: %s", + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + ci->fd = erl_connect (cinfo->conn.nodename); + if (ci->fd < 0) /* {{{ */ + { + char errbuf[1024]; + pthread_mutex_unlock (&connection_lock); + ERROR ("erlang plugin: erl_connect(%s) failed: %s", + cinfo->conn.nodename, + sstrerror (erl_errno, errbuf, sizeof (errbuf))); + sfree (ci); + erl_free_term (eterm_cb); + status = send_error (cinfo->fd, req->from, "erl_connect failed."); + return (status); + } /* }}} */ + + pthread_mutex_unlock (&connection_lock); + + ci->fun = eterm_cb; + + memset (&ud, 0, sizeof (ud)); + ud.data = ci; + ud.free_func = (void (*) (void *)) ce_free_callback_info; + + ssnprintf (callback_name, sizeof (callback_name), "erlang:%i", + connection_number); + + status = plugin_register_complex_read (callback_name, + ce_read, /* interval = */ NULL, &ud); + if (status == 0) + status = send_atom (cinfo->fd, req->from, "success"); + else + status = send_error (cinfo->fd, req->from, + "plugin_register_complex_read failed."); + + return (status); } /* }}} int handle_dispatch_values */ static void *handle_client_thread (void *arg) /* {{{ */ -- 2.11.0