erlang plugin: Actually register a read callback.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 14 Nov 2009 09:13:39 +0000 (10:13 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 14 Nov 2009 09:13:39 +0000 (10:13 +0100)
src/erlang.c

index 3da9e0f..9728614 100644 (file)
@@ -43,6 +43,13 @@ struct ce_connection_info_s
 };
 typedef struct ce_connection_info_s ce_connection_info_t;
 
+struct ce_callback_info_s
+{
+       int fd;
+       ETERM *fun;
+};
+typedef struct ce_callback_info_s ce_callback_info_t;
+
 /*
  * Private variables
  */
@@ -65,9 +72,29 @@ static char conf_hostname[256] = "alyja";
 static char conf_nodename[256] = "collectd";
 static char conf_fullname[256] = "collectd@alyja.office.noris.de";
 
+static int connection_counter = 1;
+static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
+
 /*
  * Private functions
  */
+static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */
+{
+       if (ci == NULL)
+               return;
+
+       if (ci->fd >= 0)
+       {
+               erl_close_connection (ci->fd);
+               ci->fd = -1;
+       }
+
+       if (ci->fun != NULL)
+               erl_free_compound (ci->fun);
+
+       free (ci);
+} /* }}} void ce_free_callback_info */
+
 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
 {
        ETERM *reply;
@@ -468,6 +495,44 @@ static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */
        return (0);
 } /* }}} int eterm_to_value_list */
 
+static int ce_read (user_data_t *ud) /* {{{ */
+{
+       ce_callback_info_t *ci;
+       ETERM *rpc_args;
+       ETERM *rpc_reply;
+
+       if ((ud == NULL) || (ud->data == NULL))
+               return (-1);
+
+       ci = ud->data;
+       
+       rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun));
+       if (rpc_args == NULL)
+       {
+               ERROR ("erlang plugin: erl_format failed.");
+               return (-1);
+       }
+
+       DEBUG ("erlang plugin: Making remote procedure call ...");
+       rpc_reply = erl_rpc (ci->fd,
+                       /* module = */ "erlang", /* function = */ "apply",
+                       /* arguments = */ rpc_args);
+       DEBUG ("erlang plugin: ... done.");
+       erl_free_compound (rpc_args);
+       if (rpc_reply == NULL)
+       {
+                       char errbuf[1024];
+                       ERROR ("erlang plugin: erl_rpc failed: %s",
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       return (-1);
+       }
+
+       /* FIXME: The return value is not yet used. */
+       erl_free_compound (rpc_reply);
+
+       return (0);
+} /* }}} int ce_read */
+
 /* Returns non-zero only if the request could not be handled gracefully. */
 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
                const ErlMessage *req)
@@ -509,8 +574,11 @@ static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */
                const ErlMessage *req)
 {
        ETERM *eterm_cb;
-       ETERM *rpc_args;
-       ETERM *rpc_reply;
+       ce_callback_info_t *ci;
+       user_data_t ud;
+       int status;
+       int connection_number;
+       char callback_name[64];
 
        if ((cinfo == NULL) || (req == NULL))
                return (EINVAL);
@@ -520,37 +588,75 @@ static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */
        if (ERL_TYPE (eterm_cb) != ERL_FUNCTION)
        {
                erl_free_term (eterm_cb);
-               send_error (cinfo->fd, req->from,
+               status = send_error (cinfo->fd, req->from,
                                "Argument to `register_read' must be a callback function.");
-               return (0);
+               return (status);
        }
 
-       send_atom (cinfo->fd, req->from, "success");
-
-       /* FIXME: The following demonstrates how to call this function. This should
-        * be moved somewhere else, of course. */
-
-       /* --- 8< --- */
-       rpc_args = erl_format ("[~w,[]]", eterm_cb);
-       if (rpc_args == NULL)
+       ci = malloc (sizeof (ci));
+       if (ci == NULL)
        {
                erl_free_term (eterm_cb);
-               send_error (cinfo->fd, req->from,
-                               "erl_format failed. Sorry.");
-               return (0);
+               status = send_error (cinfo->fd, req->from, "malloc failed.");
+               return (status);
        }
 
-       rpc_reply = erl_rpc (cinfo->fd,
-                       /* module = */ "erlang", /* function = */ "apply",
-                       /* arguments = */ rpc_args);
+       /* Lock around `erl_connect_init' and `erl_connect'. */
+       pthread_mutex_lock (&connection_lock);
 
-       erl_free_compound (rpc_args);
-       /* Right now, the return value is not used. */
-       erl_free_compound (rpc_reply);
-       /* --- >8 --- */
+       connection_number = connection_counter;
+       connection_counter++;
 
-       erl_free_term (eterm_cb); /* XXX: This must stay here. */
-       return (0);
+       /* Create a new `cnode' for each connection. Otherwise we cannot determine
+        * which RPC call a message belongs to. */
+       status = erl_connect_init (connection_number, conf_cookie,
+                       /* creation = */ 0);
+       if (!status) /* Yes, it's this way around in this case ... {{{ */
+       {
+                       char errbuf[1024];
+                       pthread_mutex_unlock (&connection_lock);
+                       ERROR ("erlang plugin: erl_connect_init failed: %s",
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       sfree (ci);
+                       erl_free_term (eterm_cb);
+                       status = send_error (cinfo->fd, req->from, "erl_connect failed.");
+                       return (status);
+       } /* }}} */
+
+       ci->fd = erl_connect (cinfo->conn.nodename);
+       if (ci->fd < 0) /* {{{ */
+       {
+                       char errbuf[1024];
+                       pthread_mutex_unlock (&connection_lock);
+                       ERROR ("erlang plugin: erl_connect(%s) failed: %s",
+                                       cinfo->conn.nodename,
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       sfree (ci);
+                       erl_free_term (eterm_cb);
+                       status = send_error (cinfo->fd, req->from, "erl_connect failed.");
+                       return (status);
+       } /* }}} */
+
+       pthread_mutex_unlock (&connection_lock);
+
+       ci->fun = eterm_cb;
+
+       memset (&ud, 0, sizeof (ud));
+       ud.data = ci;
+       ud.free_func = (void (*) (void *)) ce_free_callback_info;
+
+       ssnprintf (callback_name, sizeof (callback_name), "erlang:%i",
+                       connection_number);
+
+       status = plugin_register_complex_read (callback_name,
+                       ce_read, /* interval = */ NULL, &ud);
+       if (status == 0)
+               status = send_atom (cinfo->fd, req->from, "success");
+       else
+               status = send_error (cinfo->fd, req->from,
+                               "plugin_register_complex_read failed.");
+
+       return (status);
 } /* }}} int handle_dispatch_values */
 
 static void *handle_client_thread (void *arg) /* {{{ */