erlang plugin: Add some proof-of-concept code for Erlang interoperability.
[collectd.git] / src / erlang.c
1 /**
2  * collectd - src/erlang.c
3  * Copyright (C) 2009  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <netdb.h>
29
30 #include <pthread.h>
31
32 #include <erl_interface.h>
33 #include <ei.h>
34
35 /* 
36  * Private data structures
37  */
38 struct ce_connection_info_s
39 {
40         int fd;
41         ErlConnect conn;
42 };
43 typedef struct ce_connection_info_s ce_connection_info_t;
44
45 /*
46  * Private variables
47  */
48 static pthread_t listen_thread_id;
49 static _Bool     listen_thread_running = false;
50
51 static char conf_service[NI_MAXSERV] = "29157";
52 static char conf_cookie[256] = "ceisaequ";
53
54 /*
55  * Private functions
56  */
57 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
58 {
59         ETERM *reply;
60         int status;
61
62         reply = erl_mk_atom (atom);
63         if (reply == NULL)
64                 return (ENOMEM);
65
66         status = erl_send (fd, to, reply);
67         erl_free_term (reply);
68
69         if (status == 1)
70                 return (0);
71         else
72                 return (erl_errno);
73 } /* }}} int send_atom */
74
75 static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
76 {
77         ETERM *reply;
78         int status;
79
80         DEBUG ("erlang plugin: send_error: message = %s.", message);
81         reply = erl_format ("{~a,~s}", "error", message);
82
83         status = erl_send (fd, to, reply);
84         if (status != 1)
85                 status = erl_errno;
86         else
87                 status = 0;
88
89         erl_free_term (reply);
90
91         return (status);
92 } /* }}} int send_error */
93
94 /* Returns non-zero only if the request could not be handled gracefully. */
95 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
96                 const ErlMessage *req)
97 {
98         ETERM *vl;
99
100         vl = erl_element (2, req->msg);
101         if ((vl == NULL) || !ERL_IS_TUPLE (vl))
102         {
103                 erl_free_term (vl);
104                 send_error (cinfo->fd, req->from, "Invalid format: VL not a tubple.");
105                 return (0);
106         }
107
108         /* We need: Identifier (5 parts), time, interval, values
109          * => 8 parts */
110         if (ERL_TUPLE_SIZE (vl) != 8)
111         {
112                 erl_free_term (vl);
113                 send_error (cinfo->fd, req->from, "Invalid format: "
114                                 "VL needs eight components.");
115                 return (0);
116         }
117
118         send_atom (cinfo->fd, req->from, "success");
119
120         return (0);
121 } /* }}} int handle_dispatch_values */
122
123 static void *handle_client_thread (void *arg) /* {{{ */
124 {
125         ce_connection_info_t *cinfo;
126         ErlMessage emsg;
127         unsigned char buffer[4096];
128
129         cinfo = arg;
130
131         DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.",
132                         cinfo->fd, cinfo->conn.nodename);
133
134         emsg.from = NULL;
135         emsg.to = NULL;
136         emsg.msg = NULL;
137
138         while (42)
139         {
140                 int status;
141
142                 erl_free_term (emsg.from);
143                 emsg.from = NULL;
144                 erl_free_term (emsg.to);
145                 emsg.to = NULL;
146                 erl_free_term (emsg.msg);
147                 emsg.msg = NULL;
148
149                 status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg);
150                 if (status == ERL_TICK)
151                         continue;
152
153                 if (status == ERL_ERROR)
154                         break;
155
156                 if (emsg.type == ERL_REG_SEND)
157                 {
158                         ETERM *func;
159                         ETERM *reply;
160
161                         if (!ERL_IS_TUPLE (emsg.msg))
162                         {
163                                 ERROR ("erlang plugin: Message is not a tuple.");
164                                 send_atom (cinfo->fd, emsg.from, "error");
165                                 continue;
166                         }
167
168                         func = erl_element (1, emsg.msg);
169                         if (!ERL_IS_ATOM (func))
170                         {
171                                 ERROR ("erlang plugin: First element is not an atom!");
172                                 send_atom (cinfo->fd, emsg.from, "error");
173                                 erl_free_term (func);
174                                 continue;
175                         }
176
177                         DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func));
178                         reply = NULL;
179                         if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
180                                 status = handle_dispatch_values (cinfo, &emsg);
181                         else
182                         {
183                                 ERROR ("erlang plugin: Received request for invalid function `%s'.",
184                                                 ERL_ATOM_PTR (func));
185                                 send_atom (cinfo->fd, emsg.from, "error");
186                                 status = 0;
187                         }
188
189                         /* Check for fatal errors in the callback functions. */
190                         if (status != 0)
191                         {
192                                 ERROR ("erlang plugin: Handling request for `%s' failed.",
193                                                 ERL_ATOM_PTR (func));
194                                 erl_free_term (func);
195                                 break;
196                         }
197
198                         erl_free_term (func);
199                 }
200                 else if (emsg.type == ERL_EXIT)
201                 {
202                         DEBUG ("erlang plugin: handle_client_thread[%i]: "
203                                         "Received exit message.", cinfo->fd);
204                         break;
205                 }
206                 else
207                 {
208                         ERROR ("erlang plugin: Message type not handled: %i.", emsg.type);
209                 }
210         } /* while (42) */
211
212         erl_free_term (emsg.from);
213         emsg.from = NULL;
214         erl_free_term (emsg.to);
215         emsg.to = NULL;
216         erl_free_term (emsg.msg);
217         emsg.msg = NULL;
218
219         DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd);
220
221         close (cinfo->fd);
222         free (cinfo);
223
224         pthread_exit ((void *) 0);
225         return ((void *) 0);
226 } /* }}} void *handle_client_thread */
227
228 static int create_listen_socket (void) /* {{{ */
229 {
230         struct addrinfo ai_hints;
231         struct addrinfo *ai_list;
232         struct addrinfo *ai_ptr;
233         int sock_descr;
234         int status;
235         int numeric_serv;
236
237         sock_descr = -1;
238
239         memset (&ai_hints, 0, sizeof (ai_hints));
240         /* AI_PASSIVE => returns INADDR_ANY */
241         ai_hints.ai_flags = AI_PASSIVE;
242 #ifdef AI_ADDRCONFIG
243         ai_hints.ai_flags |= AI_ADDRCONFIG;
244 #endif
245         /* IPv4 only :( */
246         ai_hints.ai_family = AF_INET;
247         ai_hints.ai_socktype = SOCK_STREAM;
248
249         ai_list = NULL;
250         status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service,
251                         &ai_hints, &ai_list);
252         if (status != 0)
253         {
254                 ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status));
255                 return (-1);
256         }
257
258         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
259         {
260                 struct sockaddr_in *sa_in;
261                 struct in_addr *sin_addr;
262                 int yes;
263
264                 assert (ai_ptr->ai_family == AF_INET);
265                 sa_in = (struct sockaddr_in *) ai_ptr->ai_addr;
266                 sin_addr = &sa_in->sin_addr;
267                 numeric_serv = (int) ntohs (sa_in->sin_port);
268
269                 /* Dunno if calling this multiple times is legal. Since it wants to have
270                  * the sin_addr for some reason this is the best place to call this,
271                  * though. -octo */
272                 status = erl_connect_xinit (/* host name = */ "leeloo",
273                                 /* plain node name = */ "collectd",
274                                 /* full node name  = */ "collectd@leeloo.lan.home.verplant.org",
275                                 /* our address     = */ sin_addr,
276                                 /* secret cookie   = */ conf_cookie,
277                                 /* instance number = */ 0);
278                 if (status < 0)
279                 {
280                         ERROR ("erlang plugin: erl_connect_xinit failed with status %i.",
281                                         status);
282                         continue;
283                 }
284
285                 sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
286                                 ai_ptr->ai_protocol);
287                 if (sock_descr < 0)
288                 {
289                         ERROR ("erlang plugin: socket(2) failed.");
290                         continue;
291                 }
292
293                 yes = 1;
294                 status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
295                 if (status != 0)
296                 {
297                         ERROR ("erlang plugin: setsockopt(2) failed.");
298                         close (sock_descr);
299                         sock_descr = -1;
300                         continue;
301                 }
302
303                 status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
304                 if (status != 0)
305                 {
306                         ERROR ("erlang plugin: bind(2) failed.");
307                         close (sock_descr);
308                         sock_descr = -1;
309                         continue;
310                 }
311
312                 status = listen (sock_descr, /* backlog = */ 10);
313                 if (status != 0)
314                 {
315                         ERROR ("erlang plugin: listen(2) failed.");
316                         close (sock_descr);
317                         sock_descr = -1;
318                         continue;
319                 }
320
321                 break;
322         } /* for (ai_list) */
323
324         freeaddrinfo (ai_list);
325
326         if (sock_descr >= 0)
327         {
328                 status = erl_publish (numeric_serv);
329                 if (status < 0)
330                 {
331                         ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
332                         close (sock_descr);
333                         sock_descr = -1;
334                         return (-1);
335                 }
336         }
337
338         return (sock_descr);
339 } /* }}} int create_listen_socket */
340
341 void *listen_thread (void *arg) /* {{{ */
342 {
343         int listen;
344         int fd;
345
346         ErlConnect conn;
347
348         /* I have no fucking idea what this does, nor what the arguments are. Didn't
349          * find any comprehensive docs yet. */
350         erl_init (/* void *x = */ NULL, /* long y = */ 0);
351
352         listen = create_listen_socket ();
353         if (listen < 0)
354                 exit (EXIT_FAILURE);
355
356         while (42)
357         {
358                 pthread_t tid;
359                 pthread_attr_t tattr;
360                 ce_connection_info_t *arg;
361
362                 fd = erl_accept (listen, &conn);
363                 if (fd < 0)
364                 {
365                         ERROR ("erlang plugin: erl_accept failed with status %i.", fd);
366                         close (listen);
367                         exit (EXIT_FAILURE);
368                 }
369                 DEBUG ("erlang plugin: Got connection from %s on fd %i.",
370                                 conn.nodename, fd);
371
372                 pthread_attr_init (&tattr);
373                 pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED);
374
375                 arg = malloc (sizeof (*arg));
376                 if (arg == NULL)
377                 {
378                         ERROR ("erlang plugin: malloc failed.");
379                         close (fd);
380                         continue;
381                 }
382                 memset (arg, 0, sizeof (*arg));
383
384                 arg->fd = fd;
385                 memcpy (&arg->conn, &conn, sizeof (conn));
386
387                 pthread_create (&tid, &tattr, handle_client_thread, arg);
388         } /* while (42) */
389
390         pthread_exit ((void *) 0);
391         return ((void *) 0);
392 } /* }}} void *listen_thread */
393
394 static int ce_init (void) /* {{{ */
395 {
396         if (!listen_thread_running)
397         {
398                 int status;
399
400                 status = pthread_create (&listen_thread_id,
401                                 /* attr = */ NULL,
402                                 listen_thread,
403                                 /* args = */ NULL);
404                 if (status == 0)
405                         listen_thread_running = true;
406         }
407
408         return (0);
409 } /* }}} int ce_init */
410
411 void module_register (void)
412 {
413         plugin_register_init ("erlang", ce_init);
414 }
415
416 /* vim: set sw=2 ts=2 noet fdm=marker : */