2 * collectd - src/erlang.c
3 * Copyright (C) 2009 Florian octo Forster
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Florian octo Forster <octo at verplant.org>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
32 #include <erl_interface.h>
36 * Private data structures
38 struct ce_connection_info_s
43 typedef struct ce_connection_info_s ce_connection_info_t;
48 static pthread_t listen_thread_id;
49 static _Bool listen_thread_running = false;
51 static char conf_service[NI_MAXSERV] = "29157";
52 static char conf_cookie[256] = "ceisaequ";
57 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
62 reply = erl_mk_atom (atom);
66 status = erl_send (fd, to, reply);
67 erl_free_term (reply);
73 } /* }}} int send_atom */
75 static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
80 DEBUG ("erlang plugin: send_error: message = %s.", message);
81 reply = erl_format ("{~a,~s}", "error", message);
83 status = erl_send (fd, to, reply);
89 erl_free_term (reply);
92 } /* }}} int send_error */
94 /* Returns non-zero only if the request could not be handled gracefully. */
95 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
96 const ErlMessage *req)
100 vl = erl_element (2, req->msg);
101 if ((vl == NULL) || !ERL_IS_TUPLE (vl))
104 send_error (cinfo->fd, req->from, "Invalid format: VL not a tubple.");
108 /* We need: Identifier (5 parts), time, interval, values
110 if (ERL_TUPLE_SIZE (vl) != 8)
113 send_error (cinfo->fd, req->from, "Invalid format: "
114 "VL needs eight components.");
118 send_atom (cinfo->fd, req->from, "success");
121 } /* }}} int handle_dispatch_values */
123 static void *handle_client_thread (void *arg) /* {{{ */
125 ce_connection_info_t *cinfo;
127 unsigned char buffer[4096];
131 DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.",
132 cinfo->fd, cinfo->conn.nodename);
142 erl_free_term (emsg.from);
144 erl_free_term (emsg.to);
146 erl_free_term (emsg.msg);
149 status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg);
150 if (status == ERL_TICK)
153 if (status == ERL_ERROR)
156 if (emsg.type == ERL_REG_SEND)
161 if (!ERL_IS_TUPLE (emsg.msg))
163 ERROR ("erlang plugin: Message is not a tuple.");
164 send_atom (cinfo->fd, emsg.from, "error");
168 func = erl_element (1, emsg.msg);
169 if (!ERL_IS_ATOM (func))
171 ERROR ("erlang plugin: First element is not an atom!");
172 send_atom (cinfo->fd, emsg.from, "error");
173 erl_free_term (func);
177 DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func));
179 if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
180 status = handle_dispatch_values (cinfo, &emsg);
183 ERROR ("erlang plugin: Received request for invalid function `%s'.",
184 ERL_ATOM_PTR (func));
185 send_atom (cinfo->fd, emsg.from, "error");
189 /* Check for fatal errors in the callback functions. */
192 ERROR ("erlang plugin: Handling request for `%s' failed.",
193 ERL_ATOM_PTR (func));
194 erl_free_term (func);
198 erl_free_term (func);
200 else if (emsg.type == ERL_EXIT)
202 DEBUG ("erlang plugin: handle_client_thread[%i]: "
203 "Received exit message.", cinfo->fd);
208 ERROR ("erlang plugin: Message type not handled: %i.", emsg.type);
212 erl_free_term (emsg.from);
214 erl_free_term (emsg.to);
216 erl_free_term (emsg.msg);
219 DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd);
224 pthread_exit ((void *) 0);
226 } /* }}} void *handle_client_thread */
228 static int create_listen_socket (void) /* {{{ */
230 struct addrinfo ai_hints;
231 struct addrinfo *ai_list;
232 struct addrinfo *ai_ptr;
239 memset (&ai_hints, 0, sizeof (ai_hints));
240 /* AI_PASSIVE => returns INADDR_ANY */
241 ai_hints.ai_flags = AI_PASSIVE;
243 ai_hints.ai_flags |= AI_ADDRCONFIG;
246 ai_hints.ai_family = AF_INET;
247 ai_hints.ai_socktype = SOCK_STREAM;
250 status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service,
251 &ai_hints, &ai_list);
254 ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status));
258 for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
260 struct sockaddr_in *sa_in;
261 struct in_addr *sin_addr;
264 assert (ai_ptr->ai_family == AF_INET);
265 sa_in = (struct sockaddr_in *) ai_ptr->ai_addr;
266 sin_addr = &sa_in->sin_addr;
267 numeric_serv = (int) ntohs (sa_in->sin_port);
269 /* Dunno if calling this multiple times is legal. Since it wants to have
270 * the sin_addr for some reason this is the best place to call this,
272 status = erl_connect_xinit (/* host name = */ "leeloo",
273 /* plain node name = */ "collectd",
274 /* full node name = */ "collectd@leeloo.lan.home.verplant.org",
275 /* our address = */ sin_addr,
276 /* secret cookie = */ conf_cookie,
277 /* instance number = */ 0);
280 ERROR ("erlang plugin: erl_connect_xinit failed with status %i.",
285 sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
286 ai_ptr->ai_protocol);
289 ERROR ("erlang plugin: socket(2) failed.");
294 status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
297 ERROR ("erlang plugin: setsockopt(2) failed.");
303 status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
306 ERROR ("erlang plugin: bind(2) failed.");
312 status = listen (sock_descr, /* backlog = */ 10);
315 ERROR ("erlang plugin: listen(2) failed.");
322 } /* for (ai_list) */
324 freeaddrinfo (ai_list);
328 status = erl_publish (numeric_serv);
331 ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
339 } /* }}} int create_listen_socket */
341 void *listen_thread (void *arg) /* {{{ */
348 /* I have no fucking idea what this does, nor what the arguments are. Didn't
349 * find any comprehensive docs yet. */
350 erl_init (/* void *x = */ NULL, /* long y = */ 0);
352 listen = create_listen_socket ();
359 pthread_attr_t tattr;
360 ce_connection_info_t *arg;
362 fd = erl_accept (listen, &conn);
365 ERROR ("erlang plugin: erl_accept failed with status %i.", fd);
369 DEBUG ("erlang plugin: Got connection from %s on fd %i.",
372 pthread_attr_init (&tattr);
373 pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED);
375 arg = malloc (sizeof (*arg));
378 ERROR ("erlang plugin: malloc failed.");
382 memset (arg, 0, sizeof (*arg));
385 memcpy (&arg->conn, &conn, sizeof (conn));
387 pthread_create (&tid, &tattr, handle_client_thread, arg);
390 pthread_exit ((void *) 0);
392 } /* }}} void *listen_thread */
394 static int ce_init (void) /* {{{ */
396 if (!listen_thread_running)
400 status = pthread_create (&listen_thread_id,
405 listen_thread_running = true;
409 } /* }}} int ce_init */
411 void module_register (void)
413 plugin_register_init ("erlang", ce_init);
416 /* vim: set sw=2 ts=2 noet fdm=marker : */