erlang plugin: Hint towards epmd if erl_publish fails.
[collectd.git] / src / erlang.c
index 4b488b4..5b84675 100644 (file)
@@ -20,6 +20,7 @@
  **/
 
 #include "collectd.h"
+#include "common.h"
 #include "plugin.h"
 
 #include <sys/types.h>
@@ -48,8 +49,21 @@ typedef struct ce_connection_info_s ce_connection_info_t;
 static pthread_t listen_thread_id;
 static _Bool     listen_thread_running = false;
 
+static const char *config_keys[] =
+{
+       "BindTo",
+       "BindPort",
+       "Cookie",
+       "NodeName"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static char conf_node[NI_MAXHOST] = "";
 static char conf_service[NI_MAXSERV] = "29157";
 static char conf_cookie[256] = "ceisaequ";
+static char conf_hostname[256] = "alyja";
+static char conf_nodename[256] = "collectd";
+static char conf_fullname[256] = "collectd@alyja.office.noris.de";
 
 /*
  * Private functions
@@ -91,30 +105,400 @@ static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
        return (status);
 } /* }}} int send_error */
 
+static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */
+{
+       if ((term == NULL) || (ret_int == NULL))
+               return (EINVAL);
+
+       switch (ERL_TYPE (term))
+       {
+               case ERL_INTEGER:
+                       *ret_int = (int) ERL_INT_VALUE (term);
+                       break;
+
+               case ERL_U_INTEGER:
+                       *ret_int = (int) ERL_INT_UVALUE (term);
+                       break;
+
+               case ERL_FLOAT:
+                       *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5);
+                       break;
+
+#ifdef ERL_LONGLONG
+               case ERL_LONGLONG:
+                       *ret_int = (int) ERL_LL_VALUE (term);
+                       break;
+#endif /* ERL_LONGLONG */
+
+#ifdef ERL_U_LONGLONG
+               case ERL_U_LONGLONG:
+                       *ret_int = (int) ERL_LL_UVALUE (term);
+                       break;
+#endif /* ERL_U_LONGLONG */
+
+               default:
+                       ERROR ("erlang plugin: Don't know how to cast "
+                                       "erlang type %#x to int.", (unsigned int) ERL_TYPE (term));
+                       return (ENOTSUP);
+       } /* switch (ERL_TYPE (term)) */
+
+       return (0);
+} /* }}} int eterm_to_int */
+
+static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */
+{
+       if ((term == NULL) || (ret_time == NULL))
+               return (EINVAL);
+
+       if (ERL_IS_NIL (term)
+                       || (ERL_IS_ATOM (term)
+                               && ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
+                                       || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))))
+       {
+               *ret_time = time (NULL);
+               return (0);
+       }
+
+       switch (ERL_TYPE (term))
+       {
+               case ERL_INTEGER:
+                       *ret_time = (time_t) ERL_INT_VALUE (term);
+                       break;
+
+               case ERL_U_INTEGER:
+                       *ret_time = (time_t) ERL_INT_UVALUE (term);
+                       break;
+
+               case ERL_ATOM:
+                       if ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
+                                       || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))
+                       {
+                               *ret_time = time (NULL);
+                       }
+                       else
+                       {
+                               ERROR ("erlang plugin: Invalid atom for time: %s.",
+                                               ERL_ATOM_PTR (term));
+                               return (ENOTSUP);
+                       }
+                       break;
+
+               case ERL_FLOAT:
+                       *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5);
+                       break;
+
+#ifdef ERL_LONGLONG
+               case ERL_LONGLONG:
+                       *ret_time = (time_t) ERL_LL_VALUE (term);
+                       break;
+#endif /* ERL_LONGLONG */
+
+#ifdef ERL_U_LONGLONG
+               case ERL_U_LONGLONG:
+                       *ret_time = (time_t) ERL_LL_UVALUE (term);
+                       break;
+#endif /* ERL_U_LONGLONG */
+
+               default:
+                       ERROR ("erlang plugin: Don't know how to cast "
+                                       "erlang type %#x to time_t.", (unsigned int) ERL_TYPE (term));
+                       return (ENOTSUP);
+       } /* switch (ERL_TYPE (term)) */
+
+       return (0);
+} /* }}} int eterm_to_time_t */
+
+static int eterm_to_string (const ETERM *term, char *buffer, size_t buffer_size) /* {{{ */
+{
+       char *tmp;
+
+       if ((term == NULL) || (buffer == NULL) || (buffer_size <= 0))
+               return (EINVAL);
+
+       memset (buffer, 0, buffer_size);
+
+       if (ERL_IS_EMPTY_LIST (term)
+                       || ERL_IS_NIL (term)
+                       || (ERL_IS_ATOM (term)
+                               && (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0)))
+       {
+               buffer[0] = 0;
+               return (0);
+       }
+
+       if (!ERL_IS_LIST (term))
+               return (-1);
+
+       tmp = erl_iolist_to_string (term);
+       if (tmp == NULL)
+               return (-1);
+
+       strncpy (buffer, tmp, buffer_size - 1);
+       erl_free (tmp);
+
+       return (0);
+} /* }}} int eterm_to_string */
+
+static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
+               value_t *value)
+{
+       if ((term == NULL) || (value == NULL))
+               return (EINVAL);
+
+       switch (ERL_TYPE (term))
+       {
+               case ERL_INTEGER:
+               {
+                       int v = ERL_INT_VALUE (term);
+                       switch (ds_type)
+                       {
+                               case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
+                               case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
+                               case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
+                               case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
+                       }
+                       break;
+               }
+
+               case ERL_U_INTEGER:
+               {
+                       unsigned int v = ERL_INT_UVALUE (term);
+                       switch (ds_type)
+                       {
+                               case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
+                               case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
+                               case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
+                               case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
+                       }
+                       break;
+               }
+
+               case ERL_FLOAT:
+               {
+                       double v = ERL_FLOAT_VALUE (term);
+                       switch (ds_type)
+                       {
+                               case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
+                               case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
+                               case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
+                               case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
+                       }
+                       break;
+               }
+
+#ifdef ERL_LONGLONG
+               case ERL_LONGLONG:
+               {
+                       long long v = ERL_LL_VALUE (term);
+                       switch (ds_type)
+                       {
+                               case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
+                               case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
+                               case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
+                               case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
+                       }
+                       break;
+               }
+#endif /* ERL_LONGLONG */
+
+#ifdef ERL_U_LONGLONG
+               case ERL_U_LONGLONG:
+               {
+                       unsigned long long v = ERL_LL_UVALUE (term);
+                       switch (ds_type)
+                       {
+                               case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
+                               case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
+                               case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
+                               case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
+                       }
+                       break;
+               }
+#endif /* ERL_U_LONGLONG */
+
+               default:
+                       ERROR ("erlang plugin: Don't know how to cast "
+                                       "erlang type %#x to value_t.", (unsigned int) ERL_TYPE (term));
+                       return (ENOTSUP);
+       } /* switch (ERL_TYPE (term)) */
+
+       return (0);
+} /* }}} int eterm_to_value */
+
+static int eterm_to_values (const ETERM *term, const data_set_t *ds, /* {{{ */
+               value_list_t *vl)
+{
+       int ds_index;
+       int status;
+
+       if ((term == NULL) || (ds == NULL) || (vl == NULL))
+               return (EINVAL);
+
+       if (!ERL_IS_LIST (term))
+               return (-1);
+
+       free (vl->values);
+       vl->values = NULL;
+       vl->values_len = 0;
+
+       while (!ERL_IS_EMPTY_LIST (term))
+       {
+               const ETERM *eterm_value;
+               value_t *tmp;
+
+               if (ds_index >= ds->ds_num)
+               {
+                       ds_index = ds->ds_num + 1;
+                       status = 0;
+                       break;
+               }
+
+               tmp = realloc (vl->values, sizeof (*tmp) * (vl->values_len + 1));
+               if (tmp == NULL)
+               {
+                       status = ENOMEM;
+                       break;
+               }
+               vl->values = tmp;
+
+               eterm_value = ERL_CONS_HEAD (term);
+               term = ERL_CONS_TAIL (term);
+
+               status = eterm_to_value (eterm_value, ds->ds[ds_index].type,
+                               vl->values + vl->values_len);
+               if (status != 0)
+                       break;
+
+               vl->values_len++;
+               ds_index++;
+       }
+
+       if ((status == 0) && (ds_index != ds->ds_num))
+               NOTICE ("erlang plugin: Incorrect number of values received for type %s: "
+                               "Expected %i, got %i.", ds->type, ds->ds_num, ds_index);
+
+       if ((status != 0) || (ds_index != ds->ds_num))
+       {
+               free (vl->values);
+               vl->values = NULL;
+               vl->values_len = 0;
+               return (status);
+       }
+
+       return (0);
+} /* }}} int eterm_to_values */
+
+static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */
+{
+       ETERM *tmp;
+       int status;
+       const data_set_t *ds;
+
+       if ((term == NULL) || (vl == NULL))
+               return (EINVAL);
+
+       if (!ERL_IS_TUPLE (term) || (ERL_TUPLE_SIZE (term) != 9))
+               return (EINVAL);
+
+       tmp = erl_element (1, term);
+       if (!ERL_IS_ATOM (tmp)
+                       || (strcmp ("value_list", ERL_ATOM_PTR (tmp)) != 0))
+       {
+               erl_free_term (tmp);
+               return (-1);
+       }
+       erl_free_term (tmp);
+
+       status = 0;
+       do
+       {
+#define TUPLE_ELEM_TO_CHAR_ARRAY(idx,buf) \
+               tmp = erl_element ((idx), term); \
+               status = eterm_to_string (tmp, (buf), sizeof (buf)); \
+               erl_free_term (tmp); \
+               if (status != 0) \
+                       break;
+
+               TUPLE_ELEM_TO_CHAR_ARRAY (2, vl->host);
+               TUPLE_ELEM_TO_CHAR_ARRAY (3, vl->plugin);
+               TUPLE_ELEM_TO_CHAR_ARRAY (4, vl->plugin_instance);
+               TUPLE_ELEM_TO_CHAR_ARRAY (5, vl->type);
+               TUPLE_ELEM_TO_CHAR_ARRAY (6, vl->type_instance);
+
+               ds = plugin_get_ds (vl->type);
+               if (ds == NULL)
+               {
+                       status = -1;
+                       break;
+               }
+
+               tmp = erl_element (7, term);
+               status = eterm_to_time_t (tmp, &vl->time);
+               erl_free_term (tmp);
+               if (status != 0)
+                       break;
+
+               tmp = erl_element (8, term);
+               status = eterm_to_int (tmp, &vl->interval);
+               erl_free_term (tmp);
+               if (status != 0)
+                       break;
+               if (vl->interval < 1)
+                       vl->interval = interval_g;
+
+               tmp = erl_element (9, term);
+               status = eterm_to_values (tmp, ds, vl);
+               erl_free_term (tmp);
+               if (status != 0)
+                       break;
+
+#undef TUPLE_ELEM_TO_CHAR_ARRAY
+       } while (0);
+
+       if (status != 0)
+               return (status);
+
+       /* validate the struct */
+       if ((vl->host[0] == 0) || (vl->plugin[0] == 0) || (vl->type[0] == 0))
+               return (-1);
+
+       if (ds->ds_num != vl->values_len)
+               return (-1);
+
+       return (0);
+} /* }}} int eterm_to_value_list */
+
 /* Returns non-zero only if the request could not be handled gracefully. */
 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
                const ErlMessage *req)
 {
-       ETERM *vl;
+       ETERM *eterm_vl;
+       value_list_t vl;
+       int status;
+
+       memset (&vl, 0, sizeof (vl));
+       vl.values = NULL;
 
-       vl = erl_element (2, req->msg);
-       if ((vl == NULL) || !ERL_IS_TUPLE (vl))
+       eterm_vl = erl_element (2, req->msg);
+       status = eterm_to_value_list (eterm_vl, &vl);
+       erl_free_term (eterm_vl);
+
+       if (status != 0)
        {
-               erl_free_term (vl);
-               send_error (cinfo->fd, req->from, "Invalid format: VL not a tubple.");
+               free (vl.values);
+               send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
                return (0);
        }
 
-       /* We need: Identifier (5 parts), time, interval, values
-        * => 8 parts */
-       if (ERL_TUPLE_SIZE (vl) != 8)
+       status = plugin_dispatch_values (&vl);
+       if (status != 0)
        {
-               erl_free_term (vl);
-               send_error (cinfo->fd, req->from, "Invalid format: "
-                               "VL needs eight components.");
+               free (vl.values);
+               send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
                return (0);
        }
 
+       free (vl.values);
        send_atom (cinfo->fd, req->from, "success");
 
        return (0);
@@ -269,9 +653,9 @@ static int create_listen_socket (void) /* {{{ */
                /* Dunno if calling this multiple times is legal. Since it wants to have
                 * the sin_addr for some reason this is the best place to call this,
                 * though. -octo */
-               status = erl_connect_xinit (/* host name = */ "leeloo",
-                               /* plain node name = */ "collectd",
-                               /* full node name  = */ "collectd@leeloo.lan.home.verplant.org",
+               status = erl_connect_xinit (/* host name = */ conf_hostname,
+                               /* plain node name = */ conf_nodename,
+                               /* full node name  = */ conf_fullname,
                                /* our address     = */ sin_addr,
                                /* secret cookie   = */ conf_cookie,
                                /* instance number = */ 0);
@@ -328,7 +712,8 @@ static int create_listen_socket (void) /* {{{ */
                status = erl_publish (numeric_serv);
                if (status < 0)
                {
-                       ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
+                       ERROR ("erlang plugin: erl_publish (%i) failed with status %i. "
+                                       "Is epmd running?", numeric_serv, status);
                        close (sock_descr);
                        sock_descr = -1;
                        return (-1);
@@ -408,8 +793,56 @@ static int ce_init (void) /* {{{ */
        return (0);
 } /* }}} int ce_init */
 
+static int ce_config (const char *key, const char *value) /* {{{ */
+{
+       if (strcasecmp ("BindTo", key) == 0)
+       {
+               sstrncpy (conf_node, value, sizeof (conf_node));
+       }
+       else if (strcasecmp ("BindPort", key) == 0)
+       {
+               sstrncpy (conf_service, value, sizeof (conf_service));
+       }
+       else if (strcasecmp ("Cookie", key) == 0)
+       {
+               sstrncpy (conf_cookie, value, sizeof (conf_cookie));
+       }
+       else if (strcasecmp ("NodeName", key) == 0)
+       {
+               const char *host;
+
+               host = strchr (value, '@');
+               if (host == NULL)
+               {
+                       sstrncpy (conf_nodename, value, sizeof (conf_nodename));
+                       sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname));
+                       ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s",
+                                       conf_nodename, conf_hostname);
+               }
+               else /* if (host != NULL) */
+               {
+                       char *tmp;
+
+                       sstrncpy (conf_nodename, value, sizeof (conf_nodename));
+                       sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname));
+                       sstrncpy (conf_fullname, value, sizeof (conf_fullname));
+
+                       tmp = strchr (conf_nodename, '@');
+                       if (tmp != NULL)
+                               *tmp = 0;
+               }
+       }
+       else
+       {
+               return (-1);
+       }
+
+       return (0);
+} /* }}} int ce_config */
+
 void module_register (void)
 {
+       plugin_register_config ("erlang", ce_config, config_keys, config_keys_num);
        plugin_register_init ("erlang", ce_init);
 }