erlang plugin: Don't exit if "erl_accept" indicates an error.
[collectd.git] / src / erlang.c
1 /**
2  * collectd - src/erlang.c
3  * Copyright (C) 2009  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "common.h"
24 #include "plugin.h"
25
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <netdb.h>
30
31 #include <pthread.h>
32
33 #include <erl_interface.h>
34 #include <ei.h>
35
36 /* 
37  * Private data structures
38  */
39 struct ce_connection_info_s
40 {
41         int fd;
42         ErlConnect conn;
43 };
44 typedef struct ce_connection_info_s ce_connection_info_t;
45
46 struct ce_callback_info_s
47 {
48         int fd;
49         ETERM *fun;
50 };
51 typedef struct ce_callback_info_s ce_callback_info_t;
52
53 /*
54  * Private variables
55  */
56 static pthread_t listen_thread_id;
57 static _Bool     listen_thread_running = false;
58
59 static const char *config_keys[] =
60 {
61         "BindTo",
62         "BindPort",
63         "Cookie",
64         "NodeName"
65 };
66 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
67
68 static char conf_node[NI_MAXHOST] = "";
69 static char conf_service[NI_MAXSERV] = "29157";
70 static char conf_cookie[256] = "ceisaequ";
71 static char conf_hostname[256] = "alyja";
72 static char conf_nodename[256] = "collectd";
73 static char conf_fullname[256] = "collectd@alyja.office.noris.de";
74
75 static int connection_counter = 1;
76 static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
77
78 /*
79  * Private functions
80  */
81 static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */
82 {
83         if (ci == NULL)
84                 return;
85
86         if (ci->fd >= 0)
87         {
88                 erl_close_connection (ci->fd);
89                 ci->fd = -1;
90         }
91
92         if (ci->fun != NULL)
93                 erl_free_compound (ci->fun);
94
95         free (ci);
96 } /* }}} void ce_free_callback_info */
97
98 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
99 {
100         ETERM *reply;
101         int status;
102
103         reply = erl_mk_atom (atom);
104         if (reply == NULL)
105                 return (ENOMEM);
106
107         status = erl_send (fd, to, reply);
108         erl_free_term (reply);
109
110         if (status == 1)
111                 return (0);
112         else
113                 return (erl_errno);
114 } /* }}} int send_atom */
115
116 static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
117 {
118         ETERM *reply;
119         int status;
120
121         DEBUG ("erlang plugin: send_error: message = %s.", message);
122         reply = erl_format ("{~a,~s}", "error", message);
123
124         status = erl_send (fd, to, reply);
125         if (status != 1)
126                 status = erl_errno;
127         else
128                 status = 0;
129
130         erl_free_term (reply);
131
132         return (status);
133 } /* }}} int send_error */
134
135 static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */
136 {
137         if ((term == NULL) || (ret_int == NULL))
138                 return (EINVAL);
139
140         switch (ERL_TYPE (term))
141         {
142                 case ERL_INTEGER:
143                         *ret_int = (int) ERL_INT_VALUE (term);
144                         break;
145
146                 case ERL_U_INTEGER:
147                         *ret_int = (int) ERL_INT_UVALUE (term);
148                         break;
149
150                 case ERL_FLOAT:
151                         *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5);
152                         break;
153
154 #ifdef ERL_LONGLONG
155                 case ERL_LONGLONG:
156                         *ret_int = (int) ERL_LL_VALUE (term);
157                         break;
158 #endif /* ERL_LONGLONG */
159
160 #ifdef ERL_U_LONGLONG
161                 case ERL_U_LONGLONG:
162                         *ret_int = (int) ERL_LL_UVALUE (term);
163                         break;
164 #endif /* ERL_U_LONGLONG */
165
166                 default:
167                         ERROR ("erlang plugin: Don't know how to cast "
168                                         "erlang type %#x to int.", (unsigned int) ERL_TYPE (term));
169                         return (ENOTSUP);
170         } /* switch (ERL_TYPE (term)) */
171
172         return (0);
173 } /* }}} int eterm_to_int */
174
175 static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */
176 {
177         if ((term == NULL) || (ret_time == NULL))
178                 return (EINVAL);
179
180         if (ERL_IS_NIL (term)
181                         || (ERL_IS_ATOM (term)
182                                 && ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
183                                         || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))))
184         {
185                 *ret_time = time (NULL);
186                 return (0);
187         }
188
189         switch (ERL_TYPE (term))
190         {
191                 case ERL_INTEGER:
192                         *ret_time = (time_t) ERL_INT_VALUE (term);
193                         break;
194
195                 case ERL_U_INTEGER:
196                         *ret_time = (time_t) ERL_INT_UVALUE (term);
197                         break;
198
199                 case ERL_ATOM:
200                         if ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
201                                         || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))
202                         {
203                                 *ret_time = time (NULL);
204                         }
205                         else
206                         {
207                                 ERROR ("erlang plugin: Invalid atom for time: %s.",
208                                                 ERL_ATOM_PTR (term));
209                                 return (ENOTSUP);
210                         }
211                         break;
212
213                 case ERL_FLOAT:
214                         *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5);
215                         break;
216
217 #ifdef ERL_LONGLONG
218                 case ERL_LONGLONG:
219                         *ret_time = (time_t) ERL_LL_VALUE (term);
220                         break;
221 #endif /* ERL_LONGLONG */
222
223 #ifdef ERL_U_LONGLONG
224                 case ERL_U_LONGLONG:
225                         *ret_time = (time_t) ERL_LL_UVALUE (term);
226                         break;
227 #endif /* ERL_U_LONGLONG */
228
229                 default:
230                         ERROR ("erlang plugin: Don't know how to cast "
231                                         "erlang type %#x to time_t.", (unsigned int) ERL_TYPE (term));
232                         return (ENOTSUP);
233         } /* switch (ERL_TYPE (term)) */
234
235         return (0);
236 } /* }}} int eterm_to_time_t */
237
238 static int eterm_to_string (const ETERM *term, char *buffer, size_t buffer_size) /* {{{ */
239 {
240         char *tmp;
241
242         if ((term == NULL) || (buffer == NULL) || (buffer_size <= 0))
243                 return (EINVAL);
244
245         memset (buffer, 0, buffer_size);
246
247         if (ERL_IS_EMPTY_LIST (term)
248                         || ERL_IS_NIL (term)
249                         || (ERL_IS_ATOM (term)
250                                 && (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0)))
251         {
252                 buffer[0] = 0;
253                 return (0);
254         }
255
256         if (!ERL_IS_LIST (term))
257                 return (-1);
258
259         tmp = erl_iolist_to_string (term);
260         if (tmp == NULL)
261                 return (-1);
262
263         strncpy (buffer, tmp, buffer_size - 1);
264         erl_free (tmp);
265
266         return (0);
267 } /* }}} int eterm_to_string */
268
269 static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
270                 value_t *value)
271 {
272         if ((term == NULL) || (value == NULL))
273                 return (EINVAL);
274
275         switch (ERL_TYPE (term))
276         {
277                 case ERL_INTEGER:
278                 {
279                         int v = ERL_INT_VALUE (term);
280                         switch (ds_type)
281                         {
282                                 case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
283                                 case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
284                                 case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
285                                 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
286                         }
287                         break;
288                 }
289
290                 case ERL_U_INTEGER:
291                 {
292                         unsigned int v = ERL_INT_UVALUE (term);
293                         switch (ds_type)
294                         {
295                                 case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
296                                 case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
297                                 case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
298                                 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
299                         }
300                         break;
301                 }
302
303                 case ERL_FLOAT:
304                 {
305                         double v = ERL_FLOAT_VALUE (term);
306                         switch (ds_type)
307                         {
308                                 case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
309                                 case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
310                                 case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
311                                 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
312                         }
313                         break;
314                 }
315
316 #ifdef ERL_LONGLONG
317                 case ERL_LONGLONG:
318                 {
319                         long long v = ERL_LL_VALUE (term);
320                         switch (ds_type)
321                         {
322                                 case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
323                                 case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
324                                 case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
325                                 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
326                         }
327                         break;
328                 }
329 #endif /* ERL_LONGLONG */
330
331 #ifdef ERL_U_LONGLONG
332                 case ERL_U_LONGLONG:
333                 {
334                         unsigned long long v = ERL_LL_UVALUE (term);
335                         switch (ds_type)
336                         {
337                                 case DS_TYPE_COUNTER:  value->counter  = (counter_t)  v; break;
338                                 case DS_TYPE_GAUGE:    value->gauge    = (gauge_t)    v; break;
339                                 case DS_TYPE_DERIVE:   value->derive   = (derive_t)   v; break;
340                                 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
341                         }
342                         break;
343                 }
344 #endif /* ERL_U_LONGLONG */
345
346                 default:
347                         ERROR ("erlang plugin: Don't know how to cast "
348                                         "erlang type %#x to value_t.", (unsigned int) ERL_TYPE (term));
349                         return (ENOTSUP);
350         } /* switch (ERL_TYPE (term)) */
351
352         return (0);
353 } /* }}} int eterm_to_value */
354
355 static int eterm_to_values (const ETERM *term, const data_set_t *ds, /* {{{ */
356                 value_list_t *vl)
357 {
358         int ds_index;
359         int status;
360
361         if ((term == NULL) || (ds == NULL) || (vl == NULL))
362                 return (EINVAL);
363
364         if (!ERL_IS_LIST (term))
365                 return (-1);
366
367         free (vl->values);
368         vl->values = NULL;
369         vl->values_len = 0;
370
371         while (!ERL_IS_EMPTY_LIST (term))
372         {
373                 const ETERM *eterm_value;
374                 value_t *tmp;
375
376                 if (ds_index >= ds->ds_num)
377                 {
378                         ds_index = ds->ds_num + 1;
379                         status = 0;
380                         break;
381                 }
382
383                 tmp = realloc (vl->values, sizeof (*tmp) * (vl->values_len + 1));
384                 if (tmp == NULL)
385                 {
386                         status = ENOMEM;
387                         break;
388                 }
389                 vl->values = tmp;
390
391                 eterm_value = ERL_CONS_HEAD (term);
392                 term = ERL_CONS_TAIL (term);
393
394                 status = eterm_to_value (eterm_value, ds->ds[ds_index].type,
395                                 vl->values + vl->values_len);
396                 if (status != 0)
397                         break;
398
399                 vl->values_len++;
400                 ds_index++;
401         }
402
403         if ((status == 0) && (ds_index != ds->ds_num))
404                 NOTICE ("erlang plugin: Incorrect number of values received for type %s: "
405                                 "Expected %i, got %i.", ds->type, ds->ds_num, ds_index);
406
407         if ((status != 0) || (ds_index != ds->ds_num))
408         {
409                 free (vl->values);
410                 vl->values = NULL;
411                 vl->values_len = 0;
412                 return (status);
413         }
414
415         return (0);
416 } /* }}} int eterm_to_values */
417
418 static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */
419 {
420         ETERM *tmp;
421         int status;
422         const data_set_t *ds;
423
424         if ((term == NULL) || (vl == NULL))
425                 return (EINVAL);
426
427         if (!ERL_IS_TUPLE (term) || (ERL_TUPLE_SIZE (term) != 9))
428                 return (EINVAL);
429
430         tmp = erl_element (1, term);
431         if (!ERL_IS_ATOM (tmp)
432                         || (strcmp ("value_list", ERL_ATOM_PTR (tmp)) != 0))
433         {
434                 erl_free_term (tmp);
435                 return (-1);
436         }
437         erl_free_term (tmp);
438
439         status = 0;
440         do
441         {
442 #define TUPLE_ELEM_TO_CHAR_ARRAY(idx,buf) \
443                 tmp = erl_element ((idx), term); \
444                 status = eterm_to_string (tmp, (buf), sizeof (buf)); \
445                 erl_free_term (tmp); \
446                 if (status != 0) \
447                         break;
448
449                 TUPLE_ELEM_TO_CHAR_ARRAY (2, vl->host);
450                 TUPLE_ELEM_TO_CHAR_ARRAY (3, vl->plugin);
451                 TUPLE_ELEM_TO_CHAR_ARRAY (4, vl->plugin_instance);
452                 TUPLE_ELEM_TO_CHAR_ARRAY (5, vl->type);
453                 TUPLE_ELEM_TO_CHAR_ARRAY (6, vl->type_instance);
454
455                 ds = plugin_get_ds (vl->type);
456                 if (ds == NULL)
457                 {
458                         status = -1;
459                         break;
460                 }
461
462                 tmp = erl_element (7, term);
463                 status = eterm_to_time_t (tmp, &vl->time);
464                 erl_free_term (tmp);
465                 if (status != 0)
466                         break;
467
468                 tmp = erl_element (8, term);
469                 status = eterm_to_int (tmp, &vl->interval);
470                 erl_free_term (tmp);
471                 if (status != 0)
472                         break;
473                 if (vl->interval < 1)
474                         vl->interval = interval_g;
475
476                 tmp = erl_element (9, term);
477                 status = eterm_to_values (tmp, ds, vl);
478                 erl_free_term (tmp);
479                 if (status != 0)
480                         break;
481
482 #undef TUPLE_ELEM_TO_CHAR_ARRAY
483         } while (0);
484
485         if (status != 0)
486                 return (status);
487
488         /* validate the struct */
489         if ((vl->host[0] == 0) || (vl->plugin[0] == 0) || (vl->type[0] == 0))
490                 return (-1);
491
492         if (ds->ds_num != vl->values_len)
493                 return (-1);
494
495         return (0);
496 } /* }}} int eterm_to_value_list */
497
498 static int ce_read (user_data_t *ud) /* {{{ */
499 {
500         ce_callback_info_t *ci;
501         ETERM *rpc_args;
502         ETERM *rpc_reply;
503
504         if ((ud == NULL) || (ud->data == NULL))
505                 return (-1);
506
507         ci = ud->data;
508         
509         rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun));
510         if (rpc_args == NULL)
511         {
512                 ERROR ("erlang plugin: erl_format failed.");
513                 return (-1);
514         }
515
516         DEBUG ("erlang plugin: Making remote procedure call ...");
517         rpc_reply = erl_rpc (ci->fd,
518                         /* module = */ "erlang", /* function = */ "apply",
519                         /* arguments = */ rpc_args);
520         DEBUG ("erlang plugin: ... done.");
521         erl_free_compound (rpc_args);
522         if (rpc_reply == NULL)
523         {
524                         char errbuf[1024];
525                         ERROR ("erlang plugin: erl_rpc failed: %s",
526                                         sstrerror (erl_errno, errbuf, sizeof (errbuf)));
527                         return (-1);
528         }
529
530         /* FIXME: The return value is not yet used. */
531         erl_free_compound (rpc_reply);
532
533         return (0);
534 } /* }}} int ce_read */
535
536 /* Returns non-zero only if the request could not be handled gracefully. */
537 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
538                 const ErlMessage *req)
539 {
540         ETERM *eterm_vl;
541         value_list_t vl;
542         int status;
543
544         memset (&vl, 0, sizeof (vl));
545         vl.values = NULL;
546
547         eterm_vl = erl_element (2, req->msg);
548         status = eterm_to_value_list (eterm_vl, &vl);
549         erl_free_term (eterm_vl);
550
551         if (status != 0)
552         {
553                 free (vl.values);
554                 status = send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
555                 return (status);
556         }
557
558         status = plugin_dispatch_values (&vl);
559         if (status != 0)
560         {
561                 free (vl.values);
562                 status = send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
563                 return (status);
564         }
565
566         free (vl.values);
567         status = send_atom (cinfo->fd, req->from, "success");
568
569         return (status);
570 } /* }}} int handle_dispatch_values */
571
572 /* Returns non-zero only if the request could not be handled gracefully. */
573 static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */
574                 const ErlMessage *req)
575 {
576         ETERM *eterm_cb;
577         ce_callback_info_t *ci;
578         user_data_t ud;
579         int status;
580         int connection_number;
581         char callback_name[64];
582
583         if ((cinfo == NULL) || (req == NULL))
584                 return (EINVAL);
585
586         eterm_cb = erl_element (2, req->msg);
587
588         if (ERL_TYPE (eterm_cb) != ERL_FUNCTION)
589         {
590                 erl_free_term (eterm_cb);
591                 status = send_error (cinfo->fd, req->from,
592                                 "Argument to `register_read' must be a callback function.");
593                 return (status);
594         }
595
596         ci = malloc (sizeof (ci));
597         if (ci == NULL)
598         {
599                 erl_free_term (eterm_cb);
600                 status = send_error (cinfo->fd, req->from, "malloc failed.");
601                 return (status);
602         }
603
604         /* Lock around `erl_connect_init' and `erl_connect'. */
605         pthread_mutex_lock (&connection_lock);
606
607         connection_number = connection_counter;
608         connection_counter++;
609
610         /* Create a new `cnode' for each connection. Otherwise we cannot determine
611          * which RPC call a message belongs to. */
612         status = erl_connect_init (connection_number, conf_cookie,
613                         /* creation = */ 0);
614         if (!status) /* Yes, it's this way around in this case ... {{{ */
615         {
616                         char errbuf[1024];
617                         pthread_mutex_unlock (&connection_lock);
618                         ERROR ("erlang plugin: erl_connect_init failed: %s",
619                                         sstrerror (erl_errno, errbuf, sizeof (errbuf)));
620                         sfree (ci);
621                         erl_free_term (eterm_cb);
622                         status = send_error (cinfo->fd, req->from, "erl_connect failed.");
623                         return (status);
624         } /* }}} */
625
626         ci->fd = erl_connect (cinfo->conn.nodename);
627         if (ci->fd < 0) /* {{{ */
628         {
629                         char errbuf[1024];
630                         pthread_mutex_unlock (&connection_lock);
631                         ERROR ("erlang plugin: erl_connect(%s) failed: %s",
632                                         cinfo->conn.nodename,
633                                         sstrerror (erl_errno, errbuf, sizeof (errbuf)));
634                         sfree (ci);
635                         erl_free_term (eterm_cb);
636                         status = send_error (cinfo->fd, req->from, "erl_connect failed.");
637                         return (status);
638         } /* }}} */
639
640         pthread_mutex_unlock (&connection_lock);
641
642         ci->fun = eterm_cb;
643
644         memset (&ud, 0, sizeof (ud));
645         ud.data = ci;
646         ud.free_func = (void (*) (void *)) ce_free_callback_info;
647
648         ssnprintf (callback_name, sizeof (callback_name), "erlang:%i",
649                         connection_number);
650
651         status = plugin_register_complex_read (callback_name,
652                         ce_read, /* interval = */ NULL, &ud);
653         if (status == 0)
654                 status = send_atom (cinfo->fd, req->from, "success");
655         else
656                 status = send_error (cinfo->fd, req->from,
657                                 "plugin_register_complex_read failed.");
658
659         return (status);
660 } /* }}} int handle_dispatch_values */
661
662 static void *handle_client_thread (void *arg) /* {{{ */
663 {
664         ce_connection_info_t *cinfo;
665         ErlMessage emsg;
666         unsigned char buffer[4096];
667
668         cinfo = arg;
669
670         DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.",
671                         cinfo->fd, cinfo->conn.nodename);
672
673         emsg.from = NULL;
674         emsg.to = NULL;
675         emsg.msg = NULL;
676
677         while (42)
678         {
679                 int status;
680
681                 erl_free_term (emsg.from);
682                 emsg.from = NULL;
683                 erl_free_term (emsg.to);
684                 emsg.to = NULL;
685                 erl_free_term (emsg.msg);
686                 emsg.msg = NULL;
687
688                 status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg);
689                 if (status == ERL_TICK)
690                         continue;
691
692                 if (status == ERL_ERROR)
693                         break;
694
695                 if (emsg.type == ERL_REG_SEND)
696                 {
697                         ETERM *func;
698                         ETERM *reply;
699
700                         if (!ERL_IS_TUPLE (emsg.msg))
701                         {
702                                 ERROR ("erlang plugin: Message is not a tuple.");
703                                 send_atom (cinfo->fd, emsg.from, "error");
704                                 continue;
705                         }
706
707                         func = erl_element (1, emsg.msg);
708                         if (!ERL_IS_ATOM (func))
709                         {
710                                 ERROR ("erlang plugin: First element is not an atom!");
711                                 send_atom (cinfo->fd, emsg.from, "error");
712                                 erl_free_term (func);
713                                 continue;
714                         }
715
716                         DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func));
717                         reply = NULL;
718                         if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
719                                 status = handle_dispatch_values (cinfo, &emsg);
720                         else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0)
721                                 status = handle_register_read (cinfo, &emsg);
722                         else
723                         {
724                                 ERROR ("erlang plugin: Received request for invalid function `%s'.",
725                                                 ERL_ATOM_PTR (func));
726                                 send_atom (cinfo->fd, emsg.from, "error");
727                                 status = 0;
728                         }
729
730                         /* Check for fatal errors in the callback functions. */
731                         if (status != 0)
732                         {
733                                 ERROR ("erlang plugin: Handling request for `%s' failed.",
734                                                 ERL_ATOM_PTR (func));
735                                 erl_free_term (func);
736                                 break;
737                         }
738
739                         erl_free_term (func);
740                 }
741                 else if (emsg.type == ERL_EXIT)
742                 {
743                         DEBUG ("erlang plugin: handle_client_thread[%i]: "
744                                         "Received exit message.", cinfo->fd);
745                         break;
746                 }
747                 else
748                 {
749                         ERROR ("erlang plugin: Message type not handled: %i.", emsg.type);
750                 }
751         } /* while (42) */
752
753         erl_free_term (emsg.from);
754         emsg.from = NULL;
755         erl_free_term (emsg.to);
756         emsg.to = NULL;
757         erl_free_term (emsg.msg);
758         emsg.msg = NULL;
759
760         DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd);
761
762         close (cinfo->fd);
763         free (cinfo);
764
765         pthread_exit ((void *) 0);
766         return ((void *) 0);
767 } /* }}} void *handle_client_thread */
768
769 static int create_listen_socket (void) /* {{{ */
770 {
771         struct addrinfo ai_hints;
772         struct addrinfo *ai_list;
773         struct addrinfo *ai_ptr;
774         int sock_descr;
775         int status;
776         int numeric_serv;
777
778         sock_descr = -1;
779
780         memset (&ai_hints, 0, sizeof (ai_hints));
781         /* AI_PASSIVE => returns INADDR_ANY */
782         ai_hints.ai_flags = AI_PASSIVE;
783 #ifdef AI_ADDRCONFIG
784         ai_hints.ai_flags |= AI_ADDRCONFIG;
785 #endif
786         /* IPv4 only :( */
787         ai_hints.ai_family = AF_INET;
788         ai_hints.ai_socktype = SOCK_STREAM;
789
790         ai_list = NULL;
791         status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service,
792                         &ai_hints, &ai_list);
793         if (status != 0)
794         {
795                 ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status));
796                 return (-1);
797         }
798
799         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
800         {
801                 struct sockaddr_in *sa_in;
802                 struct in_addr *sin_addr;
803                 int yes;
804
805                 assert (ai_ptr->ai_family == AF_INET);
806                 sa_in = (struct sockaddr_in *) ai_ptr->ai_addr;
807                 sin_addr = &sa_in->sin_addr;
808                 numeric_serv = (int) ntohs (sa_in->sin_port);
809
810                 /* Dunno if calling this multiple times is legal. Since it wants to have
811                  * the sin_addr for some reason this is the best place to call this,
812                  * though. -octo */
813                 status = erl_connect_xinit (/* host name = */ conf_hostname,
814                                 /* plain node name = */ conf_nodename,
815                                 /* full node name  = */ conf_fullname,
816                                 /* our address     = */ sin_addr,
817                                 /* secret cookie   = */ conf_cookie,
818                                 /* instance number = */ 0);
819                 if (status < 0)
820                 {
821                         ERROR ("erlang plugin: erl_connect_xinit failed with status %i.",
822                                         status);
823                         continue;
824                 }
825
826                 sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
827                                 ai_ptr->ai_protocol);
828                 if (sock_descr < 0)
829                 {
830                         ERROR ("erlang plugin: socket(2) failed.");
831                         continue;
832                 }
833
834                 yes = 1;
835                 status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
836                 if (status != 0)
837                 {
838                         ERROR ("erlang plugin: setsockopt(2) failed.");
839                         close (sock_descr);
840                         sock_descr = -1;
841                         continue;
842                 }
843
844                 status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
845                 if (status != 0)
846                 {
847                         ERROR ("erlang plugin: bind(2) failed.");
848                         close (sock_descr);
849                         sock_descr = -1;
850                         continue;
851                 }
852
853                 status = listen (sock_descr, /* backlog = */ 10);
854                 if (status != 0)
855                 {
856                         ERROR ("erlang plugin: listen(2) failed.");
857                         close (sock_descr);
858                         sock_descr = -1;
859                         continue;
860                 }
861
862                 break;
863         } /* for (ai_list) */
864
865         freeaddrinfo (ai_list);
866
867         if (sock_descr >= 0)
868         {
869                 status = erl_publish (numeric_serv);
870                 if (status < 0)
871                 {
872                         ERROR ("erlang plugin: erl_publish (%i) failed with status %i. "
873                                         "Is epmd running?", numeric_serv, status);
874                         close (sock_descr);
875                         sock_descr = -1;
876                         return (-1);
877                 }
878         }
879
880         if (sock_descr >= 0)
881         {
882                 INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, "
883                                 "Cookie %s.",
884                                 conf_fullname, numeric_serv, conf_cookie);
885         }
886
887         return (sock_descr);
888 } /* }}} int create_listen_socket */
889
890 void *listen_thread (void *arg) /* {{{ */
891 {
892         int listen;
893         int fd;
894
895         ErlConnect conn;
896
897         /* I have no fucking idea what this does, nor what the arguments are. Didn't
898          * find any comprehensive docs yet. */
899         erl_init (/* void *x = */ NULL, /* long y = */ 0);
900
901         listen = create_listen_socket ();
902         if (listen < 0)
903                 exit (EXIT_FAILURE);
904
905         while (42)
906         {
907                 pthread_t tid;
908                 pthread_attr_t tattr;
909                 ce_connection_info_t *arg;
910
911                 fd = erl_accept (listen, &conn);
912                 if (fd < 0)
913                 {
914                         char errbuf[1024];
915                         ERROR ("erlang plugin: erl_accept failed: %s",
916                                         sstrerror (erl_errno, errbuf, sizeof (errbuf)));
917                         continue;
918                 }
919                 DEBUG ("erlang plugin: Got connection from %s on fd %i.",
920                                 conn.nodename, fd);
921
922                 pthread_attr_init (&tattr);
923                 pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED);
924
925                 arg = malloc (sizeof (*arg));
926                 if (arg == NULL)
927                 {
928                         ERROR ("erlang plugin: malloc failed.");
929                         close (fd);
930                         continue;
931                 }
932                 memset (arg, 0, sizeof (*arg));
933
934                 arg->fd = fd;
935                 memcpy (&arg->conn, &conn, sizeof (conn));
936
937                 pthread_create (&tid, &tattr, handle_client_thread, arg);
938         } /* while (42) */
939
940         pthread_exit ((void *) 0);
941         return ((void *) 0);
942 } /* }}} void *listen_thread */
943
944 static int ce_init (void) /* {{{ */
945 {
946         if (!listen_thread_running)
947         {
948                 int status;
949
950                 status = pthread_create (&listen_thread_id,
951                                 /* attr = */ NULL,
952                                 listen_thread,
953                                 /* args = */ NULL);
954                 if (status == 0)
955                         listen_thread_running = true;
956         }
957
958         return (0);
959 } /* }}} int ce_init */
960
961 static int ce_config (const char *key, const char *value) /* {{{ */
962 {
963         if (strcasecmp ("BindTo", key) == 0)
964         {
965                 sstrncpy (conf_node, value, sizeof (conf_node));
966         }
967         else if (strcasecmp ("BindPort", key) == 0)
968         {
969                 sstrncpy (conf_service, value, sizeof (conf_service));
970         }
971         else if (strcasecmp ("Cookie", key) == 0)
972         {
973                 sstrncpy (conf_cookie, value, sizeof (conf_cookie));
974         }
975         else if (strcasecmp ("NodeName", key) == 0)
976         {
977                 const char *host;
978
979                 host = strchr (value, '@');
980                 if (host == NULL)
981                 {
982                         sstrncpy (conf_nodename, value, sizeof (conf_nodename));
983                         sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname));
984                         ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s",
985                                         conf_nodename, conf_hostname);
986                 }
987                 else /* if (host != NULL) */
988                 {
989                         char *tmp;
990
991                         sstrncpy (conf_nodename, value, sizeof (conf_nodename));
992                         sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname));
993                         sstrncpy (conf_fullname, value, sizeof (conf_fullname));
994
995                         tmp = strchr (conf_nodename, '@');
996                         if (tmp != NULL)
997                                 *tmp = 0;
998                 }
999         }
1000         else
1001         {
1002                 return (-1);
1003         }
1004
1005         return (0);
1006 } /* }}} int ce_config */
1007
1008 void module_register (void)
1009 {
1010         plugin_register_config ("erlang", ce_config, config_keys, config_keys_num);
1011         plugin_register_init ("erlang", ce_init);
1012 }
1013
1014 /* vim: set sw=2 ts=2 noet fdm=marker : */