2 * collectd - src/zeromq.c
3 * Copyright (C) 2005-2010 Florian octo Forster
4 * Copyright (C) 2009 Aman Gupta
5 * Copyright (C) 2010 Julien Ammous
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at verplant.org>
22 * Aman Gupta <aman at tmm1.net>
27 #include "common.h" /* auxiliary functions */
28 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
29 #include "utils_cache.h"
34 # include <arpa/inet.h>
39 #include "zeromq_borrowed.c"
46 typedef struct cmq_socket_s cmq_socket_t;
48 static int cmq_threads_num = 1;
49 static void *cmq_context = NULL;
51 static pthread_t *receive_thread_ids = NULL;
52 static size_t receive_thread_num = 0;
53 static int sending_sockets_num = 0;
55 static _Bool thread_running = 1;
57 static void cmq_close_callback (void *socket) /* {{{ */
60 (void) zmq_close (socket);
61 } /* }}} void cmq_close_callback */
63 static void free_data (void *data, void *hint) /* {{{ */
66 } /* }}} void free_data */
68 static void *receive_thread (void *cmq_socket) /* {{{ */
74 assert (cmq_socket != NULL);
76 while (thread_running)
80 (void) zmq_msg_init (&msg);
82 status = zmq_recv (cmq_socket, &msg, /* flags = */ 0);
85 if ((errno == EAGAIN) || (errno == EINTR))
88 ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno));
92 data = zmq_msg_data (&msg);
93 data_size = zmq_msg_size (&msg);
95 status = parse_packet (NULL, data, data_size,
97 /* username = */ NULL);
98 DEBUG("zeromq plugin: received data, parse returned %d", status);
100 (void) zmq_msg_close (&msg);
101 } /* while (thread_running) */
103 DEBUG ("zeromq plugin: Receive thread is terminating.");
104 (void) zmq_close (cmq_socket);
107 } /* }}} void *receive_thread */
109 #define PACKET_SIZE 512
111 static int write_notification (const notification_t *n, /* {{{ */
112 __attribute__((unused)) user_data_t *user_data)
114 char buffer[PACKET_SIZE];
115 char *buffer_ptr = buffer;
116 int buffer_free = sizeof (buffer);
120 void *cmq_socket = user_data->data;
122 memset (buffer, '\0', sizeof (buffer));
125 status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME, (uint64_t) n->time);
129 status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY, (uint64_t) n->severity);
133 if (strlen (n->host) > 0)
135 status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST, n->host, strlen (n->host));
140 if (strlen (n->plugin) > 0)
142 status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN, n->plugin, strlen (n->plugin));
147 if (strlen (n->plugin_instance) > 0)
149 status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN_INSTANCE, n->plugin_instance,
150 strlen (n->plugin_instance));
155 if (strlen (n->type) > 0)
157 status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE, n->type, strlen (n->type));
162 if (strlen (n->type_instance) > 0)
164 status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE, n->type_instance,
165 strlen (n->type_instance));
170 status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE, n->message, strlen (n->message));
174 // zeromq will free the memory when needed by calling the free_data function
175 if( zmq_msg_init_data(&msg, buffer, sizeof(buffer) - buffer_free, free_data, NULL) != 0 ) {
176 ERROR("zmq_msg_init : %s", zmq_strerror(errno));
180 // try to send the message
181 if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
182 if( errno == EAGAIN ) {
183 WARNING("ZeroMQ: Cannot send message, queue is full");
186 ERROR("zmq_send : %s", zmq_strerror(errno));
192 } /* }}} int write_notification */
194 static int write_value (const data_set_t *ds, /* {{{ */
195 const value_list_t *vl,
196 user_data_t *user_data)
198 void *cmq_socket = user_data->data;
202 int send_buffer_size = PACKET_SIZE, real_size;
204 send_buffer = malloc(PACKET_SIZE);
205 if( send_buffer == NULL ) {
206 ERROR("Unable to allocate memory for send_buffer, aborting write");
211 memset(send_buffer, 0, PACKET_SIZE);
213 real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
215 // zeromq will free the memory when needed by calling the free_data function
216 if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
217 ERROR("zmq_msg_init : %s", zmq_strerror(errno));
221 // try to send the message
222 if( zmq_send(cmq_socket, &msg, ZMQ_NOBLOCK) != 0 ) {
223 if( errno == EAGAIN ) {
224 WARNING("ZeroMQ: Unable to queue message, queue may be full");
228 ERROR("zmq_send : %s", zmq_strerror(errno));
233 DEBUG("ZeroMQ: data sent");
236 } /* }}} int write_value */
238 static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */
240 char buffer[64] = "";
243 status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
247 if (strcasecmp ("Publish", buffer) == 0)
249 else if (strcasecmp ("Subscribe", buffer) == 0)
251 else if (strcasecmp ("Push", buffer) == 0)
253 else if (strcasecmp ("Pull", buffer) == 0)
256 ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"",
259 } /* }}} int cmq_config_mode */
261 static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
269 type = cmq_config_mode (ci);
273 if (cmq_context == NULL)
275 cmq_context = zmq_init (cmq_threads_num);
276 if (cmq_context == NULL)
278 ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s",
279 zmq_strerror (errno));
283 INFO("ZeroMQ: Using %d threads", cmq_threads_num);
286 /* Create a new socket */
287 cmq_socket = zmq_socket (cmq_context, type);
288 if (cmq_socket == NULL)
290 ERROR ("zeromq plugin: zmq_socket failed: %s",
291 zmq_strerror (errno));
297 /* Subscribe to all messages */
298 status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE,
299 /* prefix = */ "", /* prefix length = */ 0);
302 ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s",
303 zmq_strerror (errno));
304 (void) zmq_close (cmq_socket);
309 /* Iterate over all children and do all the binds and connects requested. */
311 for (i = 0; i < ci->children_num; i++)
313 oconfig_item_t *child = ci->children + i;
315 if (strcasecmp ("Endpoint", child->key) == 0)
319 status = cf_util_get_string (child, &value);
323 if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
325 DEBUG("Binding to %s", value);
326 status = zmq_bind (cmq_socket, value);
329 ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
330 value, zmq_strerror (errno));
335 else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
337 DEBUG("Connecting to %s", value);
338 status = zmq_connect (cmq_socket, value);
341 ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s",
342 value, zmq_strerror (errno));
357 else if( strcasecmp("HWM", child->key) == 0 )
362 status = cf_util_get_int(child, &tmp);
366 hwm = (uint64_t) tmp;
368 status = zmq_setsockopt (cmq_socket, ZMQ_HWM, &hwm, sizeof(hwm));
370 ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_HWM) failed: %s", zmq_strerror (errno));
371 (void) zmq_close (cmq_socket);
379 ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
382 } /* for (i = 0; i < ci->children_num; i++) */
384 if (endpoints_num == 0)
386 ERROR ("zeromq plugin: No (valid) \"Endpoint\" option was found in this "
387 "\"Socket\" block.");
388 (void) zmq_close (cmq_socket);
392 /* If this is a receiving socket, create a new receive thread */
393 if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
395 pthread_t *thread_ptr;
397 thread_ptr = realloc (receive_thread_ids,
398 sizeof (*receive_thread_ids) * (receive_thread_num + 1));
399 if (thread_ptr == NULL)
401 ERROR ("zeromq plugin: realloc failed.");
404 receive_thread_ids = thread_ptr;
405 thread_ptr = receive_thread_ids + receive_thread_num;
407 status = pthread_create (thread_ptr,
409 /* func = */ receive_thread,
410 /* args = */ cmq_socket);
414 ERROR ("zeromq plugin: pthread_create failed: %s",
415 sstrerror (errno, errbuf, sizeof (errbuf)));
416 (void) zmq_close (cmq_socket);
420 receive_thread_num++;
423 /* If this is a sending socket, register a new write function */
424 else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
426 user_data_t ud = { NULL, NULL };
429 ud.data = cmq_socket;
430 ud.free_func = cmq_close_callback;
432 ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
433 sending_sockets_num++;
435 plugin_register_write (name, write_value, &ud);
437 ssnprintf (name, sizeof (name), "zeromq/%i/notif", sending_sockets_num);
439 plugin_register_notification (name, write_notification, &ud);
443 } /* }}} int cmq_config_socket */
453 * Endpoint "tcp://localhost:6666"
456 * Endpoint "tcp://eth0:6666"
457 * Endpoint "tcp://collectd.example.com:6666"
461 static int cmq_config (oconfig_item_t *ci) /* {{{ */
466 for (i = 0; i < ci->children_num; i++)
468 oconfig_item_t *child = ci->children + i;
470 if (strcasecmp ("Socket", child->key) == 0)
471 status = cmq_config_socket (child);
472 else if (strcasecmp ("Threads", child->key) == 0)
475 status = cf_util_get_int (child, &tmp);
476 if ((status == 0) && (tmp >= 1))
477 cmq_threads_num = tmp;
481 WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.",
487 } /* }}} int cmq_config */
489 static int plugin_init (void) /* {{{ */
491 int major, minor, patch;
492 zmq_version (&major, &minor, &patch);
493 INFO("ZeroMQ plugin loaded (zeromq v%d.%d.%d).", major, minor, patch);
495 } /* }}} int plugin_init */
497 static int cmq_shutdown (void) /* {{{ */
501 if (cmq_context == NULL)
504 /* Signal thread to shut down */
507 DEBUG ("ZeroMQ plugin: Waiting for %zu receive thread(s) to shut down.",
510 for (i = 0; i < receive_thread_num; i++)
511 pthread_join (receive_thread_ids[i], /* return ptr = */ NULL);
513 if (zmq_term (cmq_context) != 0)
515 ERROR ("ZeroMQ plugin: zmq_term failed: %s", zmq_strerror (errno));
520 } /* }}} int cmq_shutdown */
522 void module_register (void)
524 plugin_register_complex_config("zeromq", cmq_config);
525 plugin_register_init("zeromq", plugin_init);
526 plugin_register_shutdown ("zeromq", cmq_shutdown);
529 /* vim: set sw=2 sts=2 et fdm=marker : */