erlang plugin: Have "send_error" free the *entire* term.
[collectd.git] / src / erlang.c
index 8317d82..96f77ae 100644 (file)
@@ -20,6 +20,7 @@
  **/
 
 #include "collectd.h"
+#include "common.h"
 #include "plugin.h"
 
 #include <sys/types.h>
@@ -42,18 +43,58 @@ struct ce_connection_info_s
 };
 typedef struct ce_connection_info_s ce_connection_info_t;
 
+struct ce_callback_info_s
+{
+       int fd;
+       ETERM *fun;
+};
+typedef struct ce_callback_info_s ce_callback_info_t;
+
 /*
  * Private variables
  */
 static pthread_t listen_thread_id;
 static _Bool     listen_thread_running = false;
 
+static const char *config_keys[] =
+{
+       "BindTo",
+       "BindPort",
+       "Cookie",
+       "NodeName"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static char conf_node[NI_MAXHOST] = "";
 static char conf_service[NI_MAXSERV] = "29157";
 static char conf_cookie[256] = "ceisaequ";
+static char conf_hostname[256] = "alyja";
+static char conf_nodename[256] = "collectd";
+static char conf_fullname[256] = "collectd@alyja.office.noris.de";
+
+static int connection_counter = 1;
+static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /*
  * Private functions
  */
+static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */
+{
+       if (ci == NULL)
+               return;
+
+       if (ci->fd >= 0)
+       {
+               erl_close_connection (ci->fd);
+               ci->fd = -1;
+       }
+
+       if (ci->fun != NULL)
+               erl_free_compound (ci->fun);
+
+       free (ci);
+} /* }}} void ce_free_callback_info */
+
 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
 {
        ETERM *reply;
@@ -86,7 +127,7 @@ static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
        else
                status = 0;
 
-       erl_free_term (reply);
+       erl_free_compound (reply);
 
        return (status);
 } /* }}} int send_error */
@@ -110,13 +151,17 @@ static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */
                        *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5);
                        break;
 
+#ifdef ERL_LONGLONG
                case ERL_LONGLONG:
                        *ret_int = (int) ERL_LL_VALUE (term);
                        break;
+#endif /* ERL_LONGLONG */
 
+#ifdef ERL_U_LONGLONG
                case ERL_U_LONGLONG:
                        *ret_int = (int) ERL_LL_UVALUE (term);
                        break;
+#endif /* ERL_U_LONGLONG */
 
                default:
                        ERROR ("erlang plugin: Don't know how to cast "
@@ -169,13 +214,17 @@ static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */
                        *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5);
                        break;
 
+#ifdef ERL_LONGLONG
                case ERL_LONGLONG:
                        *ret_time = (time_t) ERL_LL_VALUE (term);
                        break;
+#endif /* ERL_LONGLONG */
 
+#ifdef ERL_U_LONGLONG
                case ERL_U_LONGLONG:
                        *ret_time = (time_t) ERL_LL_UVALUE (term);
                        break;
+#endif /* ERL_U_LONGLONG */
 
                default:
                        ERROR ("erlang plugin: Don't know how to cast "
@@ -264,6 +313,7 @@ static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
                        break;
                }
 
+#ifdef ERL_LONGLONG
                case ERL_LONGLONG:
                {
                        long long v = ERL_LL_VALUE (term);
@@ -276,7 +326,9 @@ static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
                        }
                        break;
                }
+#endif /* ERL_LONGLONG */
 
+#ifdef ERL_U_LONGLONG
                case ERL_U_LONGLONG:
                {
                        unsigned long long v = ERL_LL_UVALUE (term);
@@ -289,6 +341,7 @@ static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
                        }
                        break;
                }
+#endif /* ERL_U_LONGLONG */
 
                default:
                        ERROR ("erlang plugin: Don't know how to cast "
@@ -442,6 +495,44 @@ static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */
        return (0);
 } /* }}} int eterm_to_value_list */
 
+static int ce_read (user_data_t *ud) /* {{{ */
+{
+       ce_callback_info_t *ci;
+       ETERM *rpc_args;
+       ETERM *rpc_reply;
+
+       if ((ud == NULL) || (ud->data == NULL))
+               return (-1);
+
+       ci = ud->data;
+       
+       rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun));
+       if (rpc_args == NULL)
+       {
+               ERROR ("erlang plugin: erl_format failed.");
+               return (-1);
+       }
+
+       DEBUG ("erlang plugin: Making remote procedure call ...");
+       rpc_reply = erl_rpc (ci->fd,
+                       /* module = */ "erlang", /* function = */ "apply",
+                       /* arguments = */ rpc_args);
+       DEBUG ("erlang plugin: ... done.");
+       erl_free_compound (rpc_args);
+       if (rpc_reply == NULL)
+       {
+                       char errbuf[1024];
+                       ERROR ("erlang plugin: erl_rpc failed: %s",
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       return (-1);
+       }
+
+       /* FIXME: The return value is not yet used. */
+       erl_free_compound (rpc_reply);
+
+       return (0);
+} /* }}} int ce_read */
+
 /* Returns non-zero only if the request could not be handled gracefully. */
 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
                const ErlMessage *req)
@@ -460,22 +551,112 @@ static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
        if (status != 0)
        {
                free (vl.values);
-               send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
-               return (0);
+               status = send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
+               return (status);
        }
 
        status = plugin_dispatch_values (&vl);
        if (status != 0)
        {
                free (vl.values);
-               send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
-               return (0);
+               status = send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
+               return (status);
        }
 
        free (vl.values);
-       send_atom (cinfo->fd, req->from, "success");
+       status = send_atom (cinfo->fd, req->from, "success");
 
-       return (0);
+       return (status);
+} /* }}} int handle_dispatch_values */
+
+/* Returns non-zero only if the request could not be handled gracefully. */
+static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */
+               const ErlMessage *req)
+{
+       ETERM *eterm_cb;
+       ce_callback_info_t *ci;
+       user_data_t ud;
+       int status;
+       int connection_number;
+       char callback_name[64];
+
+       if ((cinfo == NULL) || (req == NULL))
+               return (EINVAL);
+
+       eterm_cb = erl_element (2, req->msg);
+
+       if (ERL_TYPE (eterm_cb) != ERL_FUNCTION)
+       {
+               erl_free_term (eterm_cb);
+               status = send_error (cinfo->fd, req->from,
+                               "Argument to `register_read' must be a callback function.");
+               return (status);
+       }
+
+       ci = malloc (sizeof (ci));
+       if (ci == NULL)
+       {
+               erl_free_term (eterm_cb);
+               status = send_error (cinfo->fd, req->from, "malloc failed.");
+               return (status);
+       }
+
+       /* Lock around `erl_connect_init' and `erl_connect'. */
+       pthread_mutex_lock (&connection_lock);
+
+       connection_number = connection_counter;
+       connection_counter++;
+
+       /* Create a new `cnode' for each connection. Otherwise we cannot determine
+        * which RPC call a message belongs to. */
+       status = erl_connect_init (connection_number, conf_cookie,
+                       /* creation = */ 0);
+       if (!status) /* Yes, it's this way around in this case ... {{{ */
+       {
+                       char errbuf[1024];
+                       pthread_mutex_unlock (&connection_lock);
+                       ERROR ("erlang plugin: erl_connect_init failed: %s",
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       sfree (ci);
+                       erl_free_term (eterm_cb);
+                       status = send_error (cinfo->fd, req->from, "erl_connect failed.");
+                       return (status);
+       } /* }}} */
+
+       ci->fd = erl_connect (cinfo->conn.nodename);
+       if (ci->fd < 0) /* {{{ */
+       {
+                       char errbuf[1024];
+                       pthread_mutex_unlock (&connection_lock);
+                       ERROR ("erlang plugin: erl_connect(%s) failed: %s",
+                                       cinfo->conn.nodename,
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       sfree (ci);
+                       erl_free_term (eterm_cb);
+                       status = send_error (cinfo->fd, req->from, "erl_connect failed.");
+                       return (status);
+       } /* }}} */
+
+       pthread_mutex_unlock (&connection_lock);
+
+       ci->fun = eterm_cb;
+
+       memset (&ud, 0, sizeof (ud));
+       ud.data = ci;
+       ud.free_func = (void (*) (void *)) ce_free_callback_info;
+
+       ssnprintf (callback_name, sizeof (callback_name), "erlang:%i",
+                       connection_number);
+
+       status = plugin_register_complex_read (callback_name,
+                       ce_read, /* interval = */ NULL, &ud);
+       if (status == 0)
+               status = send_atom (cinfo->fd, req->from, "success");
+       else
+               status = send_error (cinfo->fd, req->from,
+                               "plugin_register_complex_read failed.");
+
+       return (status);
 } /* }}} int handle_dispatch_values */
 
 static void *handle_client_thread (void *arg) /* {{{ */
@@ -536,6 +717,8 @@ static void *handle_client_thread (void *arg) /* {{{ */
                        reply = NULL;
                        if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
                                status = handle_dispatch_values (cinfo, &emsg);
+                       else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0)
+                               status = handle_register_read (cinfo, &emsg);
                        else
                        {
                                ERROR ("erlang plugin: Received request for invalid function `%s'.",
@@ -627,9 +810,9 @@ static int create_listen_socket (void) /* {{{ */
                /* Dunno if calling this multiple times is legal. Since it wants to have
                 * the sin_addr for some reason this is the best place to call this,
                 * though. -octo */
-               status = erl_connect_xinit (/* host name = */ "leeloo",
-                               /* plain node name = */ "collectd",
-                               /* full node name  = */ "collectd@leeloo.lan.home.verplant.org",
+               status = erl_connect_xinit (/* host name = */ conf_hostname,
+                               /* plain node name = */ conf_nodename,
+                               /* full node name  = */ conf_fullname,
                                /* our address     = */ sin_addr,
                                /* secret cookie   = */ conf_cookie,
                                /* instance number = */ 0);
@@ -686,13 +869,21 @@ static int create_listen_socket (void) /* {{{ */
                status = erl_publish (numeric_serv);
                if (status < 0)
                {
-                       ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
+                       ERROR ("erlang plugin: erl_publish (%i) failed with status %i. "
+                                       "Is epmd running?", numeric_serv, status);
                        close (sock_descr);
                        sock_descr = -1;
                        return (-1);
                }
        }
 
+       if (sock_descr >= 0)
+       {
+               INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, "
+                               "Cookie %s.",
+                               conf_fullname, numeric_serv, conf_cookie);
+       }
+
        return (sock_descr);
 } /* }}} int create_listen_socket */
 
@@ -720,9 +911,10 @@ void *listen_thread (void *arg) /* {{{ */
                fd = erl_accept (listen, &conn);
                if (fd < 0)
                {
-                       ERROR ("erlang plugin: erl_accept failed with status %i.", fd);
-                       close (listen);
-                       exit (EXIT_FAILURE);
+                       char errbuf[1024];
+                       ERROR ("erlang plugin: erl_accept failed: %s",
+                                       sstrerror (erl_errno, errbuf, sizeof (errbuf)));
+                       continue;
                }
                DEBUG ("erlang plugin: Got connection from %s on fd %i.",
                                conn.nodename, fd);
@@ -766,8 +958,56 @@ static int ce_init (void) /* {{{ */
        return (0);
 } /* }}} int ce_init */
 
+static int ce_config (const char *key, const char *value) /* {{{ */
+{
+       if (strcasecmp ("BindTo", key) == 0)
+       {
+               sstrncpy (conf_node, value, sizeof (conf_node));
+       }
+       else if (strcasecmp ("BindPort", key) == 0)
+       {
+               sstrncpy (conf_service, value, sizeof (conf_service));
+       }
+       else if (strcasecmp ("Cookie", key) == 0)
+       {
+               sstrncpy (conf_cookie, value, sizeof (conf_cookie));
+       }
+       else if (strcasecmp ("NodeName", key) == 0)
+       {
+               const char *host;
+
+               host = strchr (value, '@');
+               if (host == NULL)
+               {
+                       sstrncpy (conf_nodename, value, sizeof (conf_nodename));
+                       sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname));
+                       ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s",
+                                       conf_nodename, conf_hostname);
+               }
+               else /* if (host != NULL) */
+               {
+                       char *tmp;
+
+                       sstrncpy (conf_nodename, value, sizeof (conf_nodename));
+                       sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname));
+                       sstrncpy (conf_fullname, value, sizeof (conf_fullname));
+
+                       tmp = strchr (conf_nodename, '@');
+                       if (tmp != NULL)
+                               *tmp = 0;
+               }
+       }
+       else
+       {
+               return (-1);
+       }
+
+       return (0);
+} /* }}} int ce_config */
+
 void module_register (void)
 {
+       plugin_register_config ("erlang", ce_config, config_keys, config_keys_num);
        plugin_register_init ("erlang", ce_init);
 }