From 71f9dc1cf6d6aa8d5dd52e89588771bfe57c90a9 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 8 Oct 2009 19:40:34 +0200 Subject: [PATCH] erlang plugin: Add some proof-of-concept code for Erlang interoperability. It doesn't do anything useful yet, but it's possible to call C functions from Erlang. --- configure.in | 71 ++++++++++ src/Makefile.am | 10 ++ src/erlang.c | 416 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 497 insertions(+) create mode 100644 src/erlang.c diff --git a/configure.in b/configure.in index d21df3cd..fa463dd8 100644 --- a/configure.in +++ b/configure.in @@ -1245,6 +1245,74 @@ fi AM_CONDITIONAL(BUILD_WITH_LIBDBI, test "x$with_libdbi" = "xyes") # }}} +# --with-erlang {{{ +ERLANG_CPPFLAGS="$ERLANG_CPPFLAGS" +ERLANG_LDFLAGS="$ERLANG_LDFLAGS" +ERLANG_LIBS="$ERLANG_LIBS" +AC_ARG_WITH(erlang, [AS_HELP_STRING([--with-erlang@<:@=PREFIX@:>@], [Path to Erlang / erl_interface.])], +[ + if test "x$withval" != "xno" && test "x$withval" != "xyes" + then + LDFLAGS="$LDFLAGS -L$withval/lib" + CPPFLAGS="$CPPFLAGS -I$withval/include -D_THREAD_SAFE" + with_erlang="yes" + else + with_erlang="$withval" + fi +], +[ + with_erlang="yes" +]) +if test "x$ERLANG_CPPFLAGS" = "x" +then + ERLANG_CPPFLAGS="-D_REENTRANT" +fi +if test "x$ERLANG_LIBS" = "x" +then + ERLANG_LIBS="-lei -lpthread" +fi + +SAVE_CPPFLAGS="$CPPFLAGS" +SAVE_LDFLAGS="$LDFLAGS" +CPPFLAGS="$CPPFLAGS $ERLANG_CPPFLAGS" +LDFLAGS="$LDFLAGS $ERLANG_LDFLAGS" + +if test "x$with_erlang" = "xyes" +then + if test "x$ERLANG_CPPFLAGS" != "x" + then + AC_MSG_NOTICE([Erlang CPPFLAGS: $ERLANG_CPPFLAGS]) + fi + AC_CHECK_HEADERS(erl_interface.h ei.h, + [with_erlang="yes"], + [with_erlang="no (Headers not found)"]) +fi +if test "x$with_erlang" = "xyes" +then + if test "x$ERLANG_LDFLAGS" != "x" + then + AC_MSG_NOTICE([Erlang LDFLAGS: $ERLANG_LDFLAGS]) + fi + AC_CHECK_LIB(erl_interface, erl_connect_xinit, + [with_erlang="yes"], + [with_erlang="no (Symbol erl_connect_xinit not found)"], + [$ERLANG_LIBS]) +fi +if test "x$with_erlang" = "xyes" +then + BUILD_WITH_ERLANG_CPPFLAGS="$ERLANG_CPPFLAGS" + BUILD_WITH_ERLANG_LDFLAGS="$ERLANG_LDFLAGS" + BUILD_WITH_ERLANG_LIBS="-lerl_interface $ERLANG_LIBS" + AC_SUBST(BUILD_WITH_ERLANG_CPPFLAGS) + AC_SUBST(BUILD_WITH_ERLANG_LDFLAGS) + AC_SUBST(BUILD_WITH_ERLANG_LIBS) +fi +AM_CONDITIONAL(BUILD_WITH_ERLANG, test "x$with_erlang" = "xyes") + +CPPFLAGS="$SAVE_CPPFLAGS" +LDFLAGS="$SAVE_LDFLAGS" +# }}} + # --with-libesmtp {{{ AC_ARG_WITH(libesmtp, [AS_HELP_STRING([--with-libesmtp@<:@=PREFIX@:>@], [Path to libesmtp.])], [ @@ -3853,6 +3921,7 @@ AC_PLUGIN([disk], [$plugin_disk], [Disk usage statistics]) AC_PLUGIN([dns], [$with_libpcap], [DNS traffic analysis]) AC_PLUGIN([email], [yes], [EMail statistics]) AC_PLUGIN([entropy], [$plugin_entropy], [Entropy statistics]) +AC_PLUGIN([erlang], [$with_erlang], [Erlang interoperability]) AC_PLUGIN([exec], [yes], [Execution of external programs]) AC_PLUGIN([filecount], [yes], [Count files in directories]) AC_PLUGIN([fscache], [$plugin_fscache], [fscache statistics]) @@ -4099,6 +4168,7 @@ Configuration: Libraries: libcurl . . . . . . . $with_libcurl libdbi . . . . . . . $with_libdbi + liberl_interface . . $with_erlang libesmtp . . . . . . $with_libesmtp libganglia . . . . . $with_libganglia libgcrypt . . . . . . $with_libgcrypt @@ -4158,6 +4228,7 @@ Configuration: dns . . . . . . . . . $enable_dns email . . . . . . . . $enable_email entropy . . . . . . . $enable_entropy + erlang . . . . . . . $enable_erlang exec . . . . . . . . $enable_exec filecount . . . . . . $enable_filecount fscache . . . . . . . $enable_fscache diff --git a/src/Makefile.am b/src/Makefile.am index d0cd99d6..0cef4609 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -314,6 +314,16 @@ collectd_LDADD += "-dlopen" entropy.la collectd_DEPENDENCIES += entropy.la endif +if BUILD_PLUGIN_ERLANG +pkglib_LTLIBRARIES += erlang.la +erlang_la_SOURCES = erlang.c +erlang_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_ERLANG_CPPFLAGS) +erlang_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_ERLANG_LDFLAGS) +erlang_la_LIBADD = $(BUILD_WITH_ERLANG_LIBS) +collectd_LDADD += "-dlopen" erlang.la +collectd_DEPENDENCIES += erlang.la +endif + if BUILD_PLUGIN_EXEC pkglib_LTLIBRARIES += exec.la exec_la_SOURCES = exec.c \ diff --git a/src/erlang.c b/src/erlang.c new file mode 100644 index 00000000..4b488b4e --- /dev/null +++ b/src/erlang.c @@ -0,0 +1,416 @@ +/** + * collectd - src/erlang.c + * Copyright (C) 2009 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#include "collectd.h" +#include "plugin.h" + +#include +#include +#include +#include + +#include + +#include +#include + +/* + * Private data structures + */ +struct ce_connection_info_s +{ + int fd; + ErlConnect conn; +}; +typedef struct ce_connection_info_s ce_connection_info_t; + +/* + * Private variables + */ +static pthread_t listen_thread_id; +static _Bool listen_thread_running = false; + +static char conf_service[NI_MAXSERV] = "29157"; +static char conf_cookie[256] = "ceisaequ"; + +/* + * Private functions + */ +static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */ +{ + ETERM *reply; + int status; + + reply = erl_mk_atom (atom); + if (reply == NULL) + return (ENOMEM); + + status = erl_send (fd, to, reply); + erl_free_term (reply); + + if (status == 1) + return (0); + else + return (erl_errno); +} /* }}} int send_atom */ + +static int send_error (int fd, ETERM *to, const char *message) /* {{{ */ +{ + ETERM *reply; + int status; + + DEBUG ("erlang plugin: send_error: message = %s.", message); + reply = erl_format ("{~a,~s}", "error", message); + + status = erl_send (fd, to, reply); + if (status != 1) + status = erl_errno; + else + status = 0; + + erl_free_term (reply); + + return (status); +} /* }}} int send_error */ + +/* Returns non-zero only if the request could not be handled gracefully. */ +static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */ + const ErlMessage *req) +{ + ETERM *vl; + + vl = erl_element (2, req->msg); + if ((vl == NULL) || !ERL_IS_TUPLE (vl)) + { + erl_free_term (vl); + send_error (cinfo->fd, req->from, "Invalid format: VL not a tubple."); + return (0); + } + + /* We need: Identifier (5 parts), time, interval, values + * => 8 parts */ + if (ERL_TUPLE_SIZE (vl) != 8) + { + erl_free_term (vl); + send_error (cinfo->fd, req->from, "Invalid format: " + "VL needs eight components."); + return (0); + } + + send_atom (cinfo->fd, req->from, "success"); + + return (0); +} /* }}} int handle_dispatch_values */ + +static void *handle_client_thread (void *arg) /* {{{ */ +{ + ce_connection_info_t *cinfo; + ErlMessage emsg; + unsigned char buffer[4096]; + + cinfo = arg; + + DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.", + cinfo->fd, cinfo->conn.nodename); + + emsg.from = NULL; + emsg.to = NULL; + emsg.msg = NULL; + + while (42) + { + int status; + + erl_free_term (emsg.from); + emsg.from = NULL; + erl_free_term (emsg.to); + emsg.to = NULL; + erl_free_term (emsg.msg); + emsg.msg = NULL; + + status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg); + if (status == ERL_TICK) + continue; + + if (status == ERL_ERROR) + break; + + if (emsg.type == ERL_REG_SEND) + { + ETERM *func; + ETERM *reply; + + if (!ERL_IS_TUPLE (emsg.msg)) + { + ERROR ("erlang plugin: Message is not a tuple."); + send_atom (cinfo->fd, emsg.from, "error"); + continue; + } + + func = erl_element (1, emsg.msg); + if (!ERL_IS_ATOM (func)) + { + ERROR ("erlang plugin: First element is not an atom!"); + send_atom (cinfo->fd, emsg.from, "error"); + erl_free_term (func); + continue; + } + + DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func)); + reply = NULL; + if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0) + status = handle_dispatch_values (cinfo, &emsg); + else + { + ERROR ("erlang plugin: Received request for invalid function `%s'.", + ERL_ATOM_PTR (func)); + send_atom (cinfo->fd, emsg.from, "error"); + status = 0; + } + + /* Check for fatal errors in the callback functions. */ + if (status != 0) + { + ERROR ("erlang plugin: Handling request for `%s' failed.", + ERL_ATOM_PTR (func)); + erl_free_term (func); + break; + } + + erl_free_term (func); + } + else if (emsg.type == ERL_EXIT) + { + DEBUG ("erlang plugin: handle_client_thread[%i]: " + "Received exit message.", cinfo->fd); + break; + } + else + { + ERROR ("erlang plugin: Message type not handled: %i.", emsg.type); + } + } /* while (42) */ + + erl_free_term (emsg.from); + emsg.from = NULL; + erl_free_term (emsg.to); + emsg.to = NULL; + erl_free_term (emsg.msg); + emsg.msg = NULL; + + DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd); + + close (cinfo->fd); + free (cinfo); + + pthread_exit ((void *) 0); + return ((void *) 0); +} /* }}} void *handle_client_thread */ + +static int create_listen_socket (void) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + int sock_descr; + int status; + int numeric_serv; + + sock_descr = -1; + + memset (&ai_hints, 0, sizeof (ai_hints)); + /* AI_PASSIVE => returns INADDR_ANY */ + ai_hints.ai_flags = AI_PASSIVE; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + /* IPv4 only :( */ + ai_hints.ai_family = AF_INET; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_list = NULL; + status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service, + &ai_hints, &ai_list); + if (status != 0) + { + ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status)); + return (-1); + } + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + struct sockaddr_in *sa_in; + struct in_addr *sin_addr; + int yes; + + assert (ai_ptr->ai_family == AF_INET); + sa_in = (struct sockaddr_in *) ai_ptr->ai_addr; + sin_addr = &sa_in->sin_addr; + numeric_serv = (int) ntohs (sa_in->sin_port); + + /* Dunno if calling this multiple times is legal. Since it wants to have + * the sin_addr for some reason this is the best place to call this, + * though. -octo */ + status = erl_connect_xinit (/* host name = */ "leeloo", + /* plain node name = */ "collectd", + /* full node name = */ "collectd@leeloo.lan.home.verplant.org", + /* our address = */ sin_addr, + /* secret cookie = */ conf_cookie, + /* instance number = */ 0); + if (status < 0) + { + ERROR ("erlang plugin: erl_connect_xinit failed with status %i.", + status); + continue; + } + + sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (sock_descr < 0) + { + ERROR ("erlang plugin: socket(2) failed."); + continue; + } + + yes = 1; + status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + if (status != 0) + { + ERROR ("erlang plugin: setsockopt(2) failed."); + close (sock_descr); + sock_descr = -1; + continue; + } + + status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + ERROR ("erlang plugin: bind(2) failed."); + close (sock_descr); + sock_descr = -1; + continue; + } + + status = listen (sock_descr, /* backlog = */ 10); + if (status != 0) + { + ERROR ("erlang plugin: listen(2) failed."); + close (sock_descr); + sock_descr = -1; + continue; + } + + break; + } /* for (ai_list) */ + + freeaddrinfo (ai_list); + + if (sock_descr >= 0) + { + status = erl_publish (numeric_serv); + if (status < 0) + { + ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status); + close (sock_descr); + sock_descr = -1; + return (-1); + } + } + + return (sock_descr); +} /* }}} int create_listen_socket */ + +void *listen_thread (void *arg) /* {{{ */ +{ + int listen; + int fd; + + ErlConnect conn; + + /* I have no fucking idea what this does, nor what the arguments are. Didn't + * find any comprehensive docs yet. */ + erl_init (/* void *x = */ NULL, /* long y = */ 0); + + listen = create_listen_socket (); + if (listen < 0) + exit (EXIT_FAILURE); + + while (42) + { + pthread_t tid; + pthread_attr_t tattr; + ce_connection_info_t *arg; + + fd = erl_accept (listen, &conn); + if (fd < 0) + { + ERROR ("erlang plugin: erl_accept failed with status %i.", fd); + close (listen); + exit (EXIT_FAILURE); + } + DEBUG ("erlang plugin: Got connection from %s on fd %i.", + conn.nodename, fd); + + pthread_attr_init (&tattr); + pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED); + + arg = malloc (sizeof (*arg)); + if (arg == NULL) + { + ERROR ("erlang plugin: malloc failed."); + close (fd); + continue; + } + memset (arg, 0, sizeof (*arg)); + + arg->fd = fd; + memcpy (&arg->conn, &conn, sizeof (conn)); + + pthread_create (&tid, &tattr, handle_client_thread, arg); + } /* while (42) */ + + pthread_exit ((void *) 0); + return ((void *) 0); +} /* }}} void *listen_thread */ + +static int ce_init (void) /* {{{ */ +{ + if (!listen_thread_running) + { + int status; + + status = pthread_create (&listen_thread_id, + /* attr = */ NULL, + listen_thread, + /* args = */ NULL); + if (status == 0) + listen_thread_running = true; + } + + return (0); +} /* }}} int ce_init */ + +void module_register (void) +{ + plugin_register_init ("erlang", ce_init); +} + +/* vim: set sw=2 ts=2 noet fdm=marker : */ -- 2.11.0