erlang plugin: Add some proof-of-concept code for Erlang interoperability.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Thu, 8 Oct 2009 17:40:34 +0000 (19:40 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Thu, 8 Oct 2009 17:40:34 +0000 (19:40 +0200)
It doesn't do anything useful yet, but it's possible to call C functions
from Erlang.

configure.in
src/Makefile.am
src/erlang.c [new file with mode: 0644]

index d21df3c..fa463dd 100644 (file)
@@ -1245,6 +1245,74 @@ fi
 AM_CONDITIONAL(BUILD_WITH_LIBDBI, test "x$with_libdbi" = "xyes")
 # }}}
 
+# --with-erlang {{{
+ERLANG_CPPFLAGS="$ERLANG_CPPFLAGS"
+ERLANG_LDFLAGS="$ERLANG_LDFLAGS"
+ERLANG_LIBS="$ERLANG_LIBS"
+AC_ARG_WITH(erlang, [AS_HELP_STRING([--with-erlang@<:@=PREFIX@:>@], [Path to Erlang / erl_interface.])],
+[
+       if test "x$withval" != "xno" && test "x$withval" != "xyes"
+       then
+               LDFLAGS="$LDFLAGS -L$withval/lib"
+               CPPFLAGS="$CPPFLAGS -I$withval/include -D_THREAD_SAFE"
+               with_erlang="yes"
+       else
+               with_erlang="$withval"
+       fi
+],
+[
+       with_erlang="yes"
+])
+if test "x$ERLANG_CPPFLAGS" = "x"
+then
+       ERLANG_CPPFLAGS="-D_REENTRANT"
+fi
+if test "x$ERLANG_LIBS" = "x"
+then
+       ERLANG_LIBS="-lei -lpthread"
+fi
+
+SAVE_CPPFLAGS="$CPPFLAGS"
+SAVE_LDFLAGS="$LDFLAGS"
+CPPFLAGS="$CPPFLAGS $ERLANG_CPPFLAGS"
+LDFLAGS="$LDFLAGS $ERLANG_LDFLAGS"
+
+if test "x$with_erlang" = "xyes"
+then
+       if test "x$ERLANG_CPPFLAGS" != "x"
+       then
+               AC_MSG_NOTICE([Erlang CPPFLAGS: $ERLANG_CPPFLAGS])
+       fi
+       AC_CHECK_HEADERS(erl_interface.h ei.h,
+                        [with_erlang="yes"],
+                        [with_erlang="no (Headers not found)"])
+fi
+if test "x$with_erlang" = "xyes"
+then
+       if test "x$ERLANG_LDFLAGS" != "x"
+       then
+               AC_MSG_NOTICE([Erlang LDFLAGS: $ERLANG_LDFLAGS])
+       fi
+       AC_CHECK_LIB(erl_interface, erl_connect_xinit,
+                    [with_erlang="yes"],
+                    [with_erlang="no (Symbol erl_connect_xinit not found)"],
+                    [$ERLANG_LIBS])
+fi
+if test "x$with_erlang" = "xyes"
+then
+       BUILD_WITH_ERLANG_CPPFLAGS="$ERLANG_CPPFLAGS"
+       BUILD_WITH_ERLANG_LDFLAGS="$ERLANG_LDFLAGS"
+       BUILD_WITH_ERLANG_LIBS="-lerl_interface $ERLANG_LIBS"
+       AC_SUBST(BUILD_WITH_ERLANG_CPPFLAGS)
+       AC_SUBST(BUILD_WITH_ERLANG_LDFLAGS)
+       AC_SUBST(BUILD_WITH_ERLANG_LIBS)
+fi
+AM_CONDITIONAL(BUILD_WITH_ERLANG, test "x$with_erlang" = "xyes")
+
+CPPFLAGS="$SAVE_CPPFLAGS"
+LDFLAGS="$SAVE_LDFLAGS"
+# }}}
+
 # --with-libesmtp {{{
 AC_ARG_WITH(libesmtp, [AS_HELP_STRING([--with-libesmtp@<:@=PREFIX@:>@], [Path to libesmtp.])],
 [
@@ -3853,6 +3921,7 @@ AC_PLUGIN([disk],        [$plugin_disk],       [Disk usage statistics])
 AC_PLUGIN([dns],         [$with_libpcap],      [DNS traffic analysis])
 AC_PLUGIN([email],       [yes],                [EMail statistics])
 AC_PLUGIN([entropy],     [$plugin_entropy],    [Entropy statistics])
+AC_PLUGIN([erlang],      [$with_erlang],       [Erlang interoperability])
 AC_PLUGIN([exec],        [yes],                [Execution of external programs])
 AC_PLUGIN([filecount],   [yes],                [Count files in directories])
 AC_PLUGIN([fscache],     [$plugin_fscache],    [fscache statistics])
@@ -4099,6 +4168,7 @@ Configuration:
   Libraries:
     libcurl . . . . . . . $with_libcurl
     libdbi  . . . . . . . $with_libdbi
+    liberl_interface  . . $with_erlang
     libesmtp  . . . . . . $with_libesmtp
     libganglia  . . . . . $with_libganglia
     libgcrypt . . . . . . $with_libgcrypt
@@ -4158,6 +4228,7 @@ Configuration:
     dns . . . . . . . . . $enable_dns
     email . . . . . . . . $enable_email
     entropy . . . . . . . $enable_entropy
+    erlang  . . . . . . . $enable_erlang
     exec  . . . . . . . . $enable_exec
     filecount . . . . . . $enable_filecount
     fscache . . . . . . . $enable_fscache
index d0cd99d..0cef460 100644 (file)
@@ -314,6 +314,16 @@ collectd_LDADD += "-dlopen" entropy.la
 collectd_DEPENDENCIES += entropy.la
 endif
 
+if BUILD_PLUGIN_ERLANG
+pkglib_LTLIBRARIES += erlang.la
+erlang_la_SOURCES = erlang.c
+erlang_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_ERLANG_CPPFLAGS)
+erlang_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_ERLANG_LDFLAGS)
+erlang_la_LIBADD = $(BUILD_WITH_ERLANG_LIBS)
+collectd_LDADD += "-dlopen" erlang.la
+collectd_DEPENDENCIES += erlang.la
+endif
+
 if BUILD_PLUGIN_EXEC
 pkglib_LTLIBRARIES += exec.la
 exec_la_SOURCES = exec.c \
diff --git a/src/erlang.c b/src/erlang.c
new file mode 100644 (file)
index 0000000..4b488b4
--- /dev/null
@@ -0,0 +1,416 @@
+/**
+ * collectd - src/erlang.c
+ * Copyright (C) 2009  Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+
+#include <pthread.h>
+
+#include <erl_interface.h>
+#include <ei.h>
+
+/* 
+ * Private data structures
+ */
+struct ce_connection_info_s
+{
+       int fd;
+       ErlConnect conn;
+};
+typedef struct ce_connection_info_s ce_connection_info_t;
+
+/*
+ * Private variables
+ */
+static pthread_t listen_thread_id;
+static _Bool     listen_thread_running = false;
+
+static char conf_service[NI_MAXSERV] = "29157";
+static char conf_cookie[256] = "ceisaequ";
+
+/*
+ * Private functions
+ */
+static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
+{
+       ETERM *reply;
+       int status;
+
+       reply = erl_mk_atom (atom);
+       if (reply == NULL)
+               return (ENOMEM);
+
+       status = erl_send (fd, to, reply);
+       erl_free_term (reply);
+
+       if (status == 1)
+               return (0);
+       else
+               return (erl_errno);
+} /* }}} int send_atom */
+
+static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
+{
+       ETERM *reply;
+       int status;
+
+       DEBUG ("erlang plugin: send_error: message = %s.", message);
+       reply = erl_format ("{~a,~s}", "error", message);
+
+       status = erl_send (fd, to, reply);
+       if (status != 1)
+               status = erl_errno;
+       else
+               status = 0;
+
+       erl_free_term (reply);
+
+       return (status);
+} /* }}} int send_error */
+
+/* Returns non-zero only if the request could not be handled gracefully. */
+static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
+               const ErlMessage *req)
+{
+       ETERM *vl;
+
+       vl = erl_element (2, req->msg);
+       if ((vl == NULL) || !ERL_IS_TUPLE (vl))
+       {
+               erl_free_term (vl);
+               send_error (cinfo->fd, req->from, "Invalid format: VL not a tubple.");
+               return (0);
+       }
+
+       /* We need: Identifier (5 parts), time, interval, values
+        * => 8 parts */
+       if (ERL_TUPLE_SIZE (vl) != 8)
+       {
+               erl_free_term (vl);
+               send_error (cinfo->fd, req->from, "Invalid format: "
+                               "VL needs eight components.");
+               return (0);
+       }
+
+       send_atom (cinfo->fd, req->from, "success");
+
+       return (0);
+} /* }}} int handle_dispatch_values */
+
+static void *handle_client_thread (void *arg) /* {{{ */
+{
+       ce_connection_info_t *cinfo;
+       ErlMessage emsg;
+       unsigned char buffer[4096];
+
+       cinfo = arg;
+
+       DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.",
+                       cinfo->fd, cinfo->conn.nodename);
+
+       emsg.from = NULL;
+       emsg.to = NULL;
+       emsg.msg = NULL;
+
+       while (42)
+       {
+               int status;
+
+               erl_free_term (emsg.from);
+               emsg.from = NULL;
+               erl_free_term (emsg.to);
+               emsg.to = NULL;
+               erl_free_term (emsg.msg);
+               emsg.msg = NULL;
+
+               status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg);
+               if (status == ERL_TICK)
+                       continue;
+
+               if (status == ERL_ERROR)
+                       break;
+
+               if (emsg.type == ERL_REG_SEND)
+               {
+                       ETERM *func;
+                       ETERM *reply;
+
+                       if (!ERL_IS_TUPLE (emsg.msg))
+                       {
+                               ERROR ("erlang plugin: Message is not a tuple.");
+                               send_atom (cinfo->fd, emsg.from, "error");
+                               continue;
+                       }
+
+                       func = erl_element (1, emsg.msg);
+                       if (!ERL_IS_ATOM (func))
+                       {
+                               ERROR ("erlang plugin: First element is not an atom!");
+                               send_atom (cinfo->fd, emsg.from, "error");
+                               erl_free_term (func);
+                               continue;
+                       }
+
+                       DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func));
+                       reply = NULL;
+                       if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
+                               status = handle_dispatch_values (cinfo, &emsg);
+                       else
+                       {
+                               ERROR ("erlang plugin: Received request for invalid function `%s'.",
+                                               ERL_ATOM_PTR (func));
+                               send_atom (cinfo->fd, emsg.from, "error");
+                               status = 0;
+                       }
+
+                       /* Check for fatal errors in the callback functions. */
+                       if (status != 0)
+                       {
+                               ERROR ("erlang plugin: Handling request for `%s' failed.",
+                                               ERL_ATOM_PTR (func));
+                               erl_free_term (func);
+                               break;
+                       }
+
+                       erl_free_term (func);
+               }
+               else if (emsg.type == ERL_EXIT)
+               {
+                       DEBUG ("erlang plugin: handle_client_thread[%i]: "
+                                       "Received exit message.", cinfo->fd);
+                       break;
+               }
+               else
+               {
+                       ERROR ("erlang plugin: Message type not handled: %i.", emsg.type);
+               }
+       } /* while (42) */
+
+       erl_free_term (emsg.from);
+       emsg.from = NULL;
+       erl_free_term (emsg.to);
+       emsg.to = NULL;
+       erl_free_term (emsg.msg);
+       emsg.msg = NULL;
+
+       DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd);
+
+       close (cinfo->fd);
+       free (cinfo);
+
+       pthread_exit ((void *) 0);
+       return ((void *) 0);
+} /* }}} void *handle_client_thread */
+
+static int create_listen_socket (void) /* {{{ */
+{
+       struct addrinfo ai_hints;
+       struct addrinfo *ai_list;
+       struct addrinfo *ai_ptr;
+       int sock_descr;
+       int status;
+       int numeric_serv;
+
+       sock_descr = -1;
+
+       memset (&ai_hints, 0, sizeof (ai_hints));
+       /* AI_PASSIVE => returns INADDR_ANY */
+       ai_hints.ai_flags = AI_PASSIVE;
+#ifdef AI_ADDRCONFIG
+       ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+       /* IPv4 only :( */
+       ai_hints.ai_family = AF_INET;
+       ai_hints.ai_socktype = SOCK_STREAM;
+
+       ai_list = NULL;
+       status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service,
+                       &ai_hints, &ai_list);
+       if (status != 0)
+       {
+               ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status));
+               return (-1);
+       }
+
+       for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+       {
+               struct sockaddr_in *sa_in;
+               struct in_addr *sin_addr;
+               int yes;
+
+               assert (ai_ptr->ai_family == AF_INET);
+               sa_in = (struct sockaddr_in *) ai_ptr->ai_addr;
+               sin_addr = &sa_in->sin_addr;
+               numeric_serv = (int) ntohs (sa_in->sin_port);
+
+               /* Dunno if calling this multiple times is legal. Since it wants to have
+                * the sin_addr for some reason this is the best place to call this,
+                * though. -octo */
+               status = erl_connect_xinit (/* host name = */ "leeloo",
+                               /* plain node name = */ "collectd",
+                               /* full node name  = */ "collectd@leeloo.lan.home.verplant.org",
+                               /* our address     = */ sin_addr,
+                               /* secret cookie   = */ conf_cookie,
+                               /* instance number = */ 0);
+               if (status < 0)
+               {
+                       ERROR ("erlang plugin: erl_connect_xinit failed with status %i.",
+                                       status);
+                       continue;
+               }
+
+               sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
+                               ai_ptr->ai_protocol);
+               if (sock_descr < 0)
+               {
+                       ERROR ("erlang plugin: socket(2) failed.");
+                       continue;
+               }
+
+               yes = 1;
+               status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
+               if (status != 0)
+               {
+                       ERROR ("erlang plugin: setsockopt(2) failed.");
+                       close (sock_descr);
+                       sock_descr = -1;
+                       continue;
+               }
+
+               status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+               if (status != 0)
+               {
+                       ERROR ("erlang plugin: bind(2) failed.");
+                       close (sock_descr);
+                       sock_descr = -1;
+                       continue;
+               }
+
+               status = listen (sock_descr, /* backlog = */ 10);
+               if (status != 0)
+               {
+                       ERROR ("erlang plugin: listen(2) failed.");
+                       close (sock_descr);
+                       sock_descr = -1;
+                       continue;
+               }
+
+               break;
+       } /* for (ai_list) */
+
+       freeaddrinfo (ai_list);
+
+       if (sock_descr >= 0)
+       {
+               status = erl_publish (numeric_serv);
+               if (status < 0)
+               {
+                       ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
+                       close (sock_descr);
+                       sock_descr = -1;
+                       return (-1);
+               }
+       }
+
+       return (sock_descr);
+} /* }}} int create_listen_socket */
+
+void *listen_thread (void *arg) /* {{{ */
+{
+       int listen;
+       int fd;
+
+       ErlConnect conn;
+
+       /* I have no fucking idea what this does, nor what the arguments are. Didn't
+        * find any comprehensive docs yet. */
+       erl_init (/* void *x = */ NULL, /* long y = */ 0);
+
+       listen = create_listen_socket ();
+       if (listen < 0)
+               exit (EXIT_FAILURE);
+
+       while (42)
+       {
+               pthread_t tid;
+               pthread_attr_t tattr;
+               ce_connection_info_t *arg;
+
+               fd = erl_accept (listen, &conn);
+               if (fd < 0)
+               {
+                       ERROR ("erlang plugin: erl_accept failed with status %i.", fd);
+                       close (listen);
+                       exit (EXIT_FAILURE);
+               }
+               DEBUG ("erlang plugin: Got connection from %s on fd %i.",
+                               conn.nodename, fd);
+
+               pthread_attr_init (&tattr);
+               pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED);
+
+               arg = malloc (sizeof (*arg));
+               if (arg == NULL)
+               {
+                       ERROR ("erlang plugin: malloc failed.");
+                       close (fd);
+                       continue;
+               }
+               memset (arg, 0, sizeof (*arg));
+
+               arg->fd = fd;
+               memcpy (&arg->conn, &conn, sizeof (conn));
+
+               pthread_create (&tid, &tattr, handle_client_thread, arg);
+       } /* while (42) */
+
+       pthread_exit ((void *) 0);
+       return ((void *) 0);
+} /* }}} void *listen_thread */
+
+static int ce_init (void) /* {{{ */
+{
+       if (!listen_thread_running)
+       {
+               int status;
+
+               status = pthread_create (&listen_thread_id,
+                               /* attr = */ NULL,
+                               listen_thread,
+                               /* args = */ NULL);
+               if (status == 0)
+                       listen_thread_running = true;
+       }
+
+       return (0);
+} /* }}} int ce_init */
+
+void module_register (void)
+{
+       plugin_register_init ("erlang", ce_init);
+}
+
+/* vim: set sw=2 ts=2 noet fdm=marker : */