2 * collectd - src/erlang.c
3 * Copyright (C) 2009 Florian octo Forster
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Florian octo Forster <octo at verplant.org>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
33 #include <erl_interface.h>
37 * Private data structures
39 struct ce_connection_info_s
44 typedef struct ce_connection_info_s ce_connection_info_t;
46 struct ce_callback_info_s
51 typedef struct ce_callback_info_s ce_callback_info_t;
56 static pthread_t listen_thread_id;
57 static _Bool listen_thread_running = false;
59 static const char *config_keys[] =
66 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
68 static char conf_node[NI_MAXHOST] = "";
69 static char conf_service[NI_MAXSERV] = "29157";
70 static char conf_cookie[256] = "ceisaequ";
71 static char conf_hostname[256] = "alyja";
72 static char conf_nodename[256] = "collectd";
73 static char conf_fullname[256] = "collectd@alyja.office.noris.de";
75 static int connection_counter = 1;
76 static pthread_mutex_t connection_lock = PTHREAD_MUTEX_INITIALIZER;
81 static void ce_free_callback_info (ce_callback_info_t *ci) /* {{{ */
88 erl_close_connection (ci->fd);
93 erl_free_compound (ci->fun);
96 } /* }}} void ce_free_callback_info */
98 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
103 reply = erl_mk_atom (atom);
107 status = erl_send (fd, to, reply);
108 erl_free_term (reply);
114 } /* }}} int send_atom */
116 static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
121 DEBUG ("erlang plugin: send_error: message = %s.", message);
122 reply = erl_format ("{~a,~s}", "error", message);
124 status = erl_send (fd, to, reply);
130 erl_free_compound (reply);
133 } /* }}} int send_error */
135 static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */
137 if ((term == NULL) || (ret_int == NULL))
140 switch (ERL_TYPE (term))
143 *ret_int = (int) ERL_INT_VALUE (term);
147 *ret_int = (int) ERL_INT_UVALUE (term);
151 *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5);
156 *ret_int = (int) ERL_LL_VALUE (term);
158 #endif /* ERL_LONGLONG */
160 #ifdef ERL_U_LONGLONG
162 *ret_int = (int) ERL_LL_UVALUE (term);
164 #endif /* ERL_U_LONGLONG */
167 ERROR ("erlang plugin: Don't know how to cast "
168 "erlang type %#x to int.", (unsigned int) ERL_TYPE (term));
170 } /* switch (ERL_TYPE (term)) */
173 } /* }}} int eterm_to_int */
175 static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */
177 if ((term == NULL) || (ret_time == NULL))
180 if (ERL_IS_NIL (term)
181 || (ERL_IS_ATOM (term)
182 && ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
183 || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))))
185 *ret_time = time (NULL);
189 switch (ERL_TYPE (term))
192 *ret_time = (time_t) ERL_INT_VALUE (term);
196 *ret_time = (time_t) ERL_INT_UVALUE (term);
200 if ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
201 || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))
203 *ret_time = time (NULL);
207 ERROR ("erlang plugin: Invalid atom for time: %s.",
208 ERL_ATOM_PTR (term));
214 *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5);
219 *ret_time = (time_t) ERL_LL_VALUE (term);
221 #endif /* ERL_LONGLONG */
223 #ifdef ERL_U_LONGLONG
225 *ret_time = (time_t) ERL_LL_UVALUE (term);
227 #endif /* ERL_U_LONGLONG */
230 ERROR ("erlang plugin: Don't know how to cast "
231 "erlang type %#x to time_t.", (unsigned int) ERL_TYPE (term));
233 } /* switch (ERL_TYPE (term)) */
236 } /* }}} int eterm_to_time_t */
238 static int eterm_to_string (const ETERM *term, char *buffer, size_t buffer_size) /* {{{ */
242 if ((term == NULL) || (buffer == NULL) || (buffer_size <= 0))
245 memset (buffer, 0, buffer_size);
247 if (ERL_IS_EMPTY_LIST (term)
249 || (ERL_IS_ATOM (term)
250 && (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0)))
256 if (!ERL_IS_LIST (term))
259 tmp = erl_iolist_to_string (term);
263 strncpy (buffer, tmp, buffer_size - 1);
267 } /* }}} int eterm_to_string */
269 static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
272 if ((term == NULL) || (value == NULL))
275 switch (ERL_TYPE (term))
279 int v = ERL_INT_VALUE (term);
282 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
283 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
284 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
285 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
292 unsigned int v = ERL_INT_UVALUE (term);
295 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
296 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
297 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
298 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
305 double v = ERL_FLOAT_VALUE (term);
308 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
309 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
310 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
311 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
319 long long v = ERL_LL_VALUE (term);
322 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
323 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
324 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
325 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
329 #endif /* ERL_LONGLONG */
331 #ifdef ERL_U_LONGLONG
334 unsigned long long v = ERL_LL_UVALUE (term);
337 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
338 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
339 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
340 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
344 #endif /* ERL_U_LONGLONG */
347 ERROR ("erlang plugin: Don't know how to cast "
348 "erlang type %#x to value_t.", (unsigned int) ERL_TYPE (term));
350 } /* switch (ERL_TYPE (term)) */
353 } /* }}} int eterm_to_value */
355 static int eterm_to_values (const ETERM *term, const data_set_t *ds, /* {{{ */
361 if ((term == NULL) || (ds == NULL) || (vl == NULL))
364 if (!ERL_IS_LIST (term))
371 while (!ERL_IS_EMPTY_LIST (term))
373 const ETERM *eterm_value;
376 if (ds_index >= ds->ds_num)
378 ds_index = ds->ds_num + 1;
383 tmp = realloc (vl->values, sizeof (*tmp) * (vl->values_len + 1));
391 eterm_value = ERL_CONS_HEAD (term);
392 term = ERL_CONS_TAIL (term);
394 status = eterm_to_value (eterm_value, ds->ds[ds_index].type,
395 vl->values + vl->values_len);
403 if ((status == 0) && (ds_index != ds->ds_num))
404 NOTICE ("erlang plugin: Incorrect number of values received for type %s: "
405 "Expected %i, got %i.", ds->type, ds->ds_num, ds_index);
407 if ((status != 0) || (ds_index != ds->ds_num))
416 } /* }}} int eterm_to_values */
418 static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */
422 const data_set_t *ds;
424 if ((term == NULL) || (vl == NULL))
427 if (!ERL_IS_TUPLE (term) || (ERL_TUPLE_SIZE (term) != 9))
430 tmp = erl_element (1, term);
431 if (!ERL_IS_ATOM (tmp)
432 || (strcmp ("value_list", ERL_ATOM_PTR (tmp)) != 0))
442 #define TUPLE_ELEM_TO_CHAR_ARRAY(idx,buf) \
443 tmp = erl_element ((idx), term); \
444 status = eterm_to_string (tmp, (buf), sizeof (buf)); \
445 erl_free_term (tmp); \
449 TUPLE_ELEM_TO_CHAR_ARRAY (2, vl->host);
450 TUPLE_ELEM_TO_CHAR_ARRAY (3, vl->plugin);
451 TUPLE_ELEM_TO_CHAR_ARRAY (4, vl->plugin_instance);
452 TUPLE_ELEM_TO_CHAR_ARRAY (5, vl->type);
453 TUPLE_ELEM_TO_CHAR_ARRAY (6, vl->type_instance);
455 ds = plugin_get_ds (vl->type);
462 tmp = erl_element (7, term);
463 status = eterm_to_time_t (tmp, &vl->time);
468 tmp = erl_element (8, term);
469 status = eterm_to_int (tmp, &vl->interval);
473 if (vl->interval < 1)
474 vl->interval = interval_g;
476 tmp = erl_element (9, term);
477 status = eterm_to_values (tmp, ds, vl);
482 #undef TUPLE_ELEM_TO_CHAR_ARRAY
488 /* validate the struct */
489 if ((vl->host[0] == 0) || (vl->plugin[0] == 0) || (vl->type[0] == 0))
492 if (ds->ds_num != vl->values_len)
496 } /* }}} int eterm_to_value_list */
498 static int ce_read (user_data_t *ud) /* {{{ */
500 ce_callback_info_t *ci;
504 if ((ud == NULL) || (ud->data == NULL))
509 rpc_args = erl_format ("[~w,[]]", erl_copy_term (ci->fun));
510 if (rpc_args == NULL)
512 ERROR ("erlang plugin: erl_format failed.");
516 DEBUG ("erlang plugin: Making remote procedure call ...");
517 rpc_reply = erl_rpc (ci->fd,
518 /* module = */ "erlang", /* function = */ "apply",
519 /* arguments = */ rpc_args);
520 DEBUG ("erlang plugin: ... done.");
521 erl_free_compound (rpc_args);
522 if (rpc_reply == NULL)
525 ERROR ("erlang plugin: erl_rpc failed: %s",
526 sstrerror (erl_errno, errbuf, sizeof (errbuf)));
530 /* FIXME: The return value is not yet used. */
531 erl_free_compound (rpc_reply);
534 } /* }}} int ce_read */
536 /* Returns non-zero only if the request could not be handled gracefully. */
537 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
538 const ErlMessage *req)
544 memset (&vl, 0, sizeof (vl));
547 eterm_vl = erl_element (2, req->msg);
548 status = eterm_to_value_list (eterm_vl, &vl);
549 erl_free_term (eterm_vl);
554 status = send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
558 status = plugin_dispatch_values (&vl);
562 status = send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
567 status = send_atom (cinfo->fd, req->from, "success");
570 } /* }}} int handle_dispatch_values */
572 /* Returns non-zero only if the request could not be handled gracefully. */
573 static int handle_register_read (ce_connection_info_t *cinfo, /* {{{ */
574 const ErlMessage *req)
577 ce_callback_info_t *ci;
580 int connection_number;
581 char callback_name[64];
583 if ((cinfo == NULL) || (req == NULL))
586 eterm_cb = erl_element (2, req->msg);
588 if (ERL_TYPE (eterm_cb) != ERL_FUNCTION)
590 erl_free_term (eterm_cb);
591 status = send_error (cinfo->fd, req->from,
592 "Argument to `register_read' must be a callback function.");
596 ci = malloc (sizeof (ci));
599 erl_free_term (eterm_cb);
600 status = send_error (cinfo->fd, req->from, "malloc failed.");
604 /* Lock around `erl_connect_init' and `erl_connect'. */
605 pthread_mutex_lock (&connection_lock);
607 connection_number = connection_counter;
608 connection_counter++;
610 /* Create a new `cnode' for each connection. Otherwise we cannot determine
611 * which RPC call a message belongs to. */
612 status = erl_connect_init (connection_number, conf_cookie,
614 if (!status) /* Yes, it's this way around in this case ... {{{ */
617 pthread_mutex_unlock (&connection_lock);
618 ERROR ("erlang plugin: erl_connect_init failed: %s",
619 sstrerror (erl_errno, errbuf, sizeof (errbuf)));
621 erl_free_term (eterm_cb);
622 status = send_error (cinfo->fd, req->from, "erl_connect failed.");
626 ci->fd = erl_connect (cinfo->conn.nodename);
627 if (ci->fd < 0) /* {{{ */
630 pthread_mutex_unlock (&connection_lock);
631 ERROR ("erlang plugin: erl_connect(%s) failed: %s",
632 cinfo->conn.nodename,
633 sstrerror (erl_errno, errbuf, sizeof (errbuf)));
635 erl_free_term (eterm_cb);
636 status = send_error (cinfo->fd, req->from, "erl_connect failed.");
640 pthread_mutex_unlock (&connection_lock);
644 memset (&ud, 0, sizeof (ud));
646 ud.free_func = (void (*) (void *)) ce_free_callback_info;
648 ssnprintf (callback_name, sizeof (callback_name), "erlang:%i",
651 status = plugin_register_complex_read (callback_name,
652 ce_read, /* interval = */ NULL, &ud);
654 status = send_atom (cinfo->fd, req->from, "success");
656 status = send_error (cinfo->fd, req->from,
657 "plugin_register_complex_read failed.");
660 } /* }}} int handle_dispatch_values */
662 static void *handle_client_thread (void *arg) /* {{{ */
664 ce_connection_info_t *cinfo;
666 unsigned char buffer[4096];
670 DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.",
671 cinfo->fd, cinfo->conn.nodename);
681 erl_free_term (emsg.from);
683 erl_free_term (emsg.to);
685 erl_free_term (emsg.msg);
688 status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg);
689 if (status == ERL_TICK)
692 if (status == ERL_ERROR)
695 if (emsg.type == ERL_REG_SEND)
700 if (!ERL_IS_TUPLE (emsg.msg))
702 ERROR ("erlang plugin: Message is not a tuple.");
703 send_atom (cinfo->fd, emsg.from, "error");
707 func = erl_element (1, emsg.msg);
708 if (!ERL_IS_ATOM (func))
710 ERROR ("erlang plugin: First element is not an atom!");
711 send_atom (cinfo->fd, emsg.from, "error");
712 erl_free_term (func);
716 DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func));
718 if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
719 status = handle_dispatch_values (cinfo, &emsg);
720 else if (strcmp ("register_read", ERL_ATOM_PTR (func)) == 0)
721 status = handle_register_read (cinfo, &emsg);
724 ERROR ("erlang plugin: Received request for invalid function `%s'.",
725 ERL_ATOM_PTR (func));
726 send_atom (cinfo->fd, emsg.from, "error");
730 /* Check for fatal errors in the callback functions. */
733 ERROR ("erlang plugin: Handling request for `%s' failed.",
734 ERL_ATOM_PTR (func));
735 erl_free_term (func);
739 erl_free_term (func);
741 else if (emsg.type == ERL_EXIT)
743 DEBUG ("erlang plugin: handle_client_thread[%i]: "
744 "Received exit message.", cinfo->fd);
749 ERROR ("erlang plugin: Message type not handled: %i.", emsg.type);
753 erl_free_term (emsg.from);
755 erl_free_term (emsg.to);
757 erl_free_term (emsg.msg);
760 DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd);
765 pthread_exit ((void *) 0);
767 } /* }}} void *handle_client_thread */
769 static int create_listen_socket (void) /* {{{ */
771 struct addrinfo ai_hints;
772 struct addrinfo *ai_list;
773 struct addrinfo *ai_ptr;
780 memset (&ai_hints, 0, sizeof (ai_hints));
781 /* AI_PASSIVE => returns INADDR_ANY */
782 ai_hints.ai_flags = AI_PASSIVE;
784 ai_hints.ai_flags |= AI_ADDRCONFIG;
787 ai_hints.ai_family = AF_INET;
788 ai_hints.ai_socktype = SOCK_STREAM;
791 status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service,
792 &ai_hints, &ai_list);
795 ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status));
799 for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
801 struct sockaddr_in *sa_in;
802 struct in_addr *sin_addr;
805 assert (ai_ptr->ai_family == AF_INET);
806 sa_in = (struct sockaddr_in *) ai_ptr->ai_addr;
807 sin_addr = &sa_in->sin_addr;
808 numeric_serv = (int) ntohs (sa_in->sin_port);
810 /* Dunno if calling this multiple times is legal. Since it wants to have
811 * the sin_addr for some reason this is the best place to call this,
813 status = erl_connect_xinit (/* host name = */ conf_hostname,
814 /* plain node name = */ conf_nodename,
815 /* full node name = */ conf_fullname,
816 /* our address = */ sin_addr,
817 /* secret cookie = */ conf_cookie,
818 /* instance number = */ 0);
821 ERROR ("erlang plugin: erl_connect_xinit failed with status %i.",
826 sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
827 ai_ptr->ai_protocol);
830 ERROR ("erlang plugin: socket(2) failed.");
835 status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
838 ERROR ("erlang plugin: setsockopt(2) failed.");
844 status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
847 ERROR ("erlang plugin: bind(2) failed.");
853 status = listen (sock_descr, /* backlog = */ 10);
856 ERROR ("erlang plugin: listen(2) failed.");
863 } /* for (ai_list) */
865 freeaddrinfo (ai_list);
869 status = erl_publish (numeric_serv);
872 ERROR ("erlang plugin: erl_publish (%i) failed with status %i. "
873 "Is epmd running?", numeric_serv, status);
882 INFO ("erlang plugin: Created Erlang socket: Nodename %s, Port %i, "
884 conf_fullname, numeric_serv, conf_cookie);
888 } /* }}} int create_listen_socket */
890 void *listen_thread (void *arg) /* {{{ */
897 /* I have no fucking idea what this does, nor what the arguments are. Didn't
898 * find any comprehensive docs yet. */
899 erl_init (/* void *x = */ NULL, /* long y = */ 0);
901 listen = create_listen_socket ();
908 pthread_attr_t tattr;
909 ce_connection_info_t *arg;
911 fd = erl_accept (listen, &conn);
915 ERROR ("erlang plugin: erl_accept failed: %s",
916 sstrerror (erl_errno, errbuf, sizeof (errbuf)));
919 DEBUG ("erlang plugin: Got connection from %s on fd %i.",
922 pthread_attr_init (&tattr);
923 pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED);
925 arg = malloc (sizeof (*arg));
928 ERROR ("erlang plugin: malloc failed.");
932 memset (arg, 0, sizeof (*arg));
935 memcpy (&arg->conn, &conn, sizeof (conn));
937 pthread_create (&tid, &tattr, handle_client_thread, arg);
940 pthread_exit ((void *) 0);
942 } /* }}} void *listen_thread */
944 static int ce_init (void) /* {{{ */
946 if (!listen_thread_running)
950 status = pthread_create (&listen_thread_id,
955 listen_thread_running = true;
959 } /* }}} int ce_init */
961 static int ce_config (const char *key, const char *value) /* {{{ */
963 if (strcasecmp ("BindTo", key) == 0)
965 sstrncpy (conf_node, value, sizeof (conf_node));
967 else if (strcasecmp ("BindPort", key) == 0)
969 sstrncpy (conf_service, value, sizeof (conf_service));
971 else if (strcasecmp ("Cookie", key) == 0)
973 sstrncpy (conf_cookie, value, sizeof (conf_cookie));
975 else if (strcasecmp ("NodeName", key) == 0)
979 host = strchr (value, '@');
982 sstrncpy (conf_nodename, value, sizeof (conf_nodename));
983 sstrncpy (conf_hostname, hostname_g, sizeof (conf_hostname));
984 ssnprintf (conf_fullname, sizeof (conf_fullname), "%s@%s",
985 conf_nodename, conf_hostname);
987 else /* if (host != NULL) */
991 sstrncpy (conf_nodename, value, sizeof (conf_nodename));
992 sstrncpy (conf_hostname, host + 1, sizeof (conf_hostname));
993 sstrncpy (conf_fullname, value, sizeof (conf_fullname));
995 tmp = strchr (conf_nodename, '@');
1006 } /* }}} int ce_config */
1008 void module_register (void)
1010 plugin_register_config ("erlang", ce_config, config_keys, config_keys_num);
1011 plugin_register_init ("erlang", ce_init);
1014 /* vim: set sw=2 ts=2 noet fdm=marker : */