2 * collectd - src/erlang.c
3 * Copyright (C) 2009 Florian octo Forster
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Florian octo Forster <octo at verplant.org>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
32 #include <erl_interface.h>
36 * Private data structures
38 struct ce_connection_info_s
43 typedef struct ce_connection_info_s ce_connection_info_t;
48 static pthread_t listen_thread_id;
49 static _Bool listen_thread_running = false;
51 static char conf_service[NI_MAXSERV] = "29157";
52 static char conf_cookie[256] = "ceisaequ";
53 static char conf_hostname[256] = "alyja";
54 static char conf_nodename[256] = "collectd";
55 static char conf_fullname[256] = "collectd@alyja.office.noris.de";
60 static int send_atom (int fd, ETERM *to, const char *atom) /* {{{ */
65 reply = erl_mk_atom (atom);
69 status = erl_send (fd, to, reply);
70 erl_free_term (reply);
76 } /* }}} int send_atom */
78 static int send_error (int fd, ETERM *to, const char *message) /* {{{ */
83 DEBUG ("erlang plugin: send_error: message = %s.", message);
84 reply = erl_format ("{~a,~s}", "error", message);
86 status = erl_send (fd, to, reply);
92 erl_free_term (reply);
95 } /* }}} int send_error */
97 static int eterm_to_int (const ETERM *term, int *ret_int) /* {{{ */
99 if ((term == NULL) || (ret_int == NULL))
102 switch (ERL_TYPE (term))
105 *ret_int = (int) ERL_INT_VALUE (term);
109 *ret_int = (int) ERL_INT_UVALUE (term);
113 *ret_int = (int) (ERL_FLOAT_VALUE (term) + .5);
118 *ret_int = (int) ERL_LL_VALUE (term);
120 #endif /* ERL_LONGLONG */
122 #ifdef ERL_U_LONGLONG
124 *ret_int = (int) ERL_LL_UVALUE (term);
126 #endif /* ERL_U_LONGLONG */
129 ERROR ("erlang plugin: Don't know how to cast "
130 "erlang type %#x to int.", (unsigned int) ERL_TYPE (term));
132 } /* switch (ERL_TYPE (term)) */
135 } /* }}} int eterm_to_int */
137 static int eterm_to_time_t (const ETERM *term, time_t *ret_time) /* {{{ */
139 if ((term == NULL) || (ret_time == NULL))
142 if (ERL_IS_NIL (term)
143 || (ERL_IS_ATOM (term)
144 && ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
145 || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))))
147 *ret_time = time (NULL);
151 switch (ERL_TYPE (term))
154 *ret_time = (time_t) ERL_INT_VALUE (term);
158 *ret_time = (time_t) ERL_INT_UVALUE (term);
162 if ((strcmp ("now", ERL_ATOM_PTR (term)) == 0)
163 || (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0))
165 *ret_time = time (NULL);
169 ERROR ("erlang plugin: Invalid atom for time: %s.",
170 ERL_ATOM_PTR (term));
176 *ret_time = (time_t) (ERL_FLOAT_VALUE (term) + .5);
181 *ret_time = (time_t) ERL_LL_VALUE (term);
183 #endif /* ERL_LONGLONG */
185 #ifdef ERL_U_LONGLONG
187 *ret_time = (time_t) ERL_LL_UVALUE (term);
189 #endif /* ERL_U_LONGLONG */
192 ERROR ("erlang plugin: Don't know how to cast "
193 "erlang type %#x to time_t.", (unsigned int) ERL_TYPE (term));
195 } /* switch (ERL_TYPE (term)) */
198 } /* }}} int eterm_to_time_t */
200 static int eterm_to_string (const ETERM *term, char *buffer, size_t buffer_size) /* {{{ */
204 if ((term == NULL) || (buffer == NULL) || (buffer_size <= 0))
207 memset (buffer, 0, buffer_size);
209 if (ERL_IS_EMPTY_LIST (term)
211 || (ERL_IS_ATOM (term)
212 && (strcmp ("undefined", ERL_ATOM_PTR (term)) == 0)))
218 if (!ERL_IS_LIST (term))
221 tmp = erl_iolist_to_string (term);
225 strncpy (buffer, tmp, buffer_size - 1);
229 } /* }}} int eterm_to_string */
231 static int eterm_to_value (const ETERM *term, int ds_type, /* {{{ */
234 if ((term == NULL) || (value == NULL))
237 switch (ERL_TYPE (term))
241 int v = ERL_INT_VALUE (term);
244 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
245 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
246 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
247 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
254 unsigned int v = ERL_INT_UVALUE (term);
257 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
258 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
259 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
260 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
267 double v = ERL_FLOAT_VALUE (term);
270 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
271 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
272 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
273 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
281 long long v = ERL_LL_VALUE (term);
284 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
285 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
286 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
287 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
291 #endif /* ERL_LONGLONG */
293 #ifdef ERL_U_LONGLONG
296 unsigned long long v = ERL_LL_UVALUE (term);
299 case DS_TYPE_COUNTER: value->counter = (counter_t) v; break;
300 case DS_TYPE_GAUGE: value->gauge = (gauge_t) v; break;
301 case DS_TYPE_DERIVE: value->derive = (derive_t) v; break;
302 case DS_TYPE_ABSOLUTE: value->absolute = (absolute_t) v; break;
306 #endif /* ERL_U_LONGLONG */
309 ERROR ("erlang plugin: Don't know how to cast "
310 "erlang type %#x to value_t.", (unsigned int) ERL_TYPE (term));
312 } /* switch (ERL_TYPE (term)) */
315 } /* }}} int eterm_to_value */
317 static int eterm_to_values (const ETERM *term, const data_set_t *ds, /* {{{ */
323 if ((term == NULL) || (ds == NULL) || (vl == NULL))
326 if (!ERL_IS_LIST (term))
333 while (!ERL_IS_EMPTY_LIST (term))
335 const ETERM *eterm_value;
338 if (ds_index >= ds->ds_num)
340 ds_index = ds->ds_num + 1;
345 tmp = realloc (vl->values, sizeof (*tmp) * (vl->values_len + 1));
353 eterm_value = ERL_CONS_HEAD (term);
354 term = ERL_CONS_TAIL (term);
356 status = eterm_to_value (eterm_value, ds->ds[ds_index].type,
357 vl->values + vl->values_len);
365 if ((status == 0) && (ds_index != ds->ds_num))
366 NOTICE ("erlang plugin: Incorrect number of values received for type %s: "
367 "Expected %i, got %i.", ds->type, ds->ds_num, ds_index);
369 if ((status != 0) || (ds_index != ds->ds_num))
378 } /* }}} int eterm_to_values */
380 static int eterm_to_value_list (const ETERM *term, value_list_t *vl) /* {{{ */
384 const data_set_t *ds;
386 if ((term == NULL) || (vl == NULL))
389 if (!ERL_IS_TUPLE (term) || (ERL_TUPLE_SIZE (term) != 9))
392 tmp = erl_element (1, term);
393 if (!ERL_IS_ATOM (tmp)
394 || (strcmp ("value_list", ERL_ATOM_PTR (tmp)) != 0))
404 #define TUPLE_ELEM_TO_CHAR_ARRAY(idx,buf) \
405 tmp = erl_element ((idx), term); \
406 status = eterm_to_string (tmp, (buf), sizeof (buf)); \
407 erl_free_term (tmp); \
411 TUPLE_ELEM_TO_CHAR_ARRAY (2, vl->host);
412 TUPLE_ELEM_TO_CHAR_ARRAY (3, vl->plugin);
413 TUPLE_ELEM_TO_CHAR_ARRAY (4, vl->plugin_instance);
414 TUPLE_ELEM_TO_CHAR_ARRAY (5, vl->type);
415 TUPLE_ELEM_TO_CHAR_ARRAY (6, vl->type_instance);
417 ds = plugin_get_ds (vl->type);
424 tmp = erl_element (7, term);
425 status = eterm_to_time_t (tmp, &vl->time);
430 tmp = erl_element (8, term);
431 status = eterm_to_int (tmp, &vl->interval);
435 if (vl->interval < 1)
436 vl->interval = interval_g;
438 tmp = erl_element (9, term);
439 status = eterm_to_values (tmp, ds, vl);
444 #undef TUPLE_ELEM_TO_CHAR_ARRAY
450 /* validate the struct */
451 if ((vl->host[0] == 0) || (vl->plugin[0] == 0) || (vl->type[0] == 0))
454 if (ds->ds_num != vl->values_len)
458 } /* }}} int eterm_to_value_list */
460 /* Returns non-zero only if the request could not be handled gracefully. */
461 static int handle_dispatch_values (ce_connection_info_t *cinfo, /* {{{ */
462 const ErlMessage *req)
468 memset (&vl, 0, sizeof (vl));
471 eterm_vl = erl_element (2, req->msg);
472 status = eterm_to_value_list (eterm_vl, &vl);
473 erl_free_term (eterm_vl);
478 send_error (cinfo->fd, req->from, "Cannot parse argument as value list.");
482 status = plugin_dispatch_values (&vl);
486 send_error (cinfo->fd, req->from, "plugin_dispatch_values failed.");
491 send_atom (cinfo->fd, req->from, "success");
494 } /* }}} int handle_dispatch_values */
496 static void *handle_client_thread (void *arg) /* {{{ */
498 ce_connection_info_t *cinfo;
500 unsigned char buffer[4096];
504 DEBUG ("erlang plugin: handle_client_thread[%i]: Handling client %s.",
505 cinfo->fd, cinfo->conn.nodename);
515 erl_free_term (emsg.from);
517 erl_free_term (emsg.to);
519 erl_free_term (emsg.msg);
522 status = erl_receive_msg (cinfo->fd, buffer, sizeof (buffer), &emsg);
523 if (status == ERL_TICK)
526 if (status == ERL_ERROR)
529 if (emsg.type == ERL_REG_SEND)
534 if (!ERL_IS_TUPLE (emsg.msg))
536 ERROR ("erlang plugin: Message is not a tuple.");
537 send_atom (cinfo->fd, emsg.from, "error");
541 func = erl_element (1, emsg.msg);
542 if (!ERL_IS_ATOM (func))
544 ERROR ("erlang plugin: First element is not an atom!");
545 send_atom (cinfo->fd, emsg.from, "error");
546 erl_free_term (func);
550 DEBUG ("erlang plugin: Wanted function is: %s.", ERL_ATOM_PTR (func));
552 if (strcmp ("dispatch_values", ERL_ATOM_PTR (func)) == 0)
553 status = handle_dispatch_values (cinfo, &emsg);
556 ERROR ("erlang plugin: Received request for invalid function `%s'.",
557 ERL_ATOM_PTR (func));
558 send_atom (cinfo->fd, emsg.from, "error");
562 /* Check for fatal errors in the callback functions. */
565 ERROR ("erlang plugin: Handling request for `%s' failed.",
566 ERL_ATOM_PTR (func));
567 erl_free_term (func);
571 erl_free_term (func);
573 else if (emsg.type == ERL_EXIT)
575 DEBUG ("erlang plugin: handle_client_thread[%i]: "
576 "Received exit message.", cinfo->fd);
581 ERROR ("erlang plugin: Message type not handled: %i.", emsg.type);
585 erl_free_term (emsg.from);
587 erl_free_term (emsg.to);
589 erl_free_term (emsg.msg);
592 DEBUG ("erlang plugin: handle_client_thread[%i]: Exiting.", cinfo->fd);
597 pthread_exit ((void *) 0);
599 } /* }}} void *handle_client_thread */
601 static int create_listen_socket (void) /* {{{ */
603 struct addrinfo ai_hints;
604 struct addrinfo *ai_list;
605 struct addrinfo *ai_ptr;
612 memset (&ai_hints, 0, sizeof (ai_hints));
613 /* AI_PASSIVE => returns INADDR_ANY */
614 ai_hints.ai_flags = AI_PASSIVE;
616 ai_hints.ai_flags |= AI_ADDRCONFIG;
619 ai_hints.ai_family = AF_INET;
620 ai_hints.ai_socktype = SOCK_STREAM;
623 status = getaddrinfo (/* node = */ NULL, /* service = */ conf_service,
624 &ai_hints, &ai_list);
627 ERROR ("erlang plugin: getaddrinfo failed: %s", gai_strerror (status));
631 for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
633 struct sockaddr_in *sa_in;
634 struct in_addr *sin_addr;
637 assert (ai_ptr->ai_family == AF_INET);
638 sa_in = (struct sockaddr_in *) ai_ptr->ai_addr;
639 sin_addr = &sa_in->sin_addr;
640 numeric_serv = (int) ntohs (sa_in->sin_port);
642 /* Dunno if calling this multiple times is legal. Since it wants to have
643 * the sin_addr for some reason this is the best place to call this,
645 status = erl_connect_xinit (/* host name = */ conf_hostname,
646 /* plain node name = */ conf_nodename,
647 /* full node name = */ conf_fullname,
648 /* our address = */ sin_addr,
649 /* secret cookie = */ conf_cookie,
650 /* instance number = */ 0);
653 ERROR ("erlang plugin: erl_connect_xinit failed with status %i.",
658 sock_descr = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
659 ai_ptr->ai_protocol);
662 ERROR ("erlang plugin: socket(2) failed.");
667 status = setsockopt (sock_descr, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
670 ERROR ("erlang plugin: setsockopt(2) failed.");
676 status = bind (sock_descr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
679 ERROR ("erlang plugin: bind(2) failed.");
685 status = listen (sock_descr, /* backlog = */ 10);
688 ERROR ("erlang plugin: listen(2) failed.");
695 } /* for (ai_list) */
697 freeaddrinfo (ai_list);
701 status = erl_publish (numeric_serv);
704 ERROR ("erlang plugin: erl_publish (%i) failed with status %i.", numeric_serv, status);
712 } /* }}} int create_listen_socket */
714 void *listen_thread (void *arg) /* {{{ */
721 /* I have no fucking idea what this does, nor what the arguments are. Didn't
722 * find any comprehensive docs yet. */
723 erl_init (/* void *x = */ NULL, /* long y = */ 0);
725 listen = create_listen_socket ();
732 pthread_attr_t tattr;
733 ce_connection_info_t *arg;
735 fd = erl_accept (listen, &conn);
738 ERROR ("erlang plugin: erl_accept failed with status %i.", fd);
742 DEBUG ("erlang plugin: Got connection from %s on fd %i.",
745 pthread_attr_init (&tattr);
746 pthread_attr_setdetachstate (&tattr, PTHREAD_CREATE_DETACHED);
748 arg = malloc (sizeof (*arg));
751 ERROR ("erlang plugin: malloc failed.");
755 memset (arg, 0, sizeof (*arg));
758 memcpy (&arg->conn, &conn, sizeof (conn));
760 pthread_create (&tid, &tattr, handle_client_thread, arg);
763 pthread_exit ((void *) 0);
765 } /* }}} void *listen_thread */
767 static int ce_init (void) /* {{{ */
769 if (!listen_thread_running)
773 status = pthread_create (&listen_thread_id,
778 listen_thread_running = true;
782 } /* }}} int ce_init */
784 void module_register (void)
786 plugin_register_init ("erlang", ce_init);
789 /* vim: set sw=2 ts=2 noet fdm=marker : */