e5e1836c4e314073e6af651ec4cfbaa3da7d6945
[collectd.git] / src / zeromq.c
1 /**
2  * collectd - src/zeromq.c
3  * Copyright (C) 2005-2010  Florian octo Forster
4  * Copyright (C) 2009       Aman Gupta
5  * Copyright (C) 2010       Julien Ammous
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at verplant.org>
22  *   Aman Gupta <aman at tmm1.net>
23  *   Julien Ammous
24  **/
25
26 #include "collectd.h"
27 #include "common.h" /* auxiliary functions */
28 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
29 #include "utils_cache.h"
30 #include "network.h"
31
32 /* for htons() */
33 #if HAVE_ARPA_INET_H
34 # include <arpa/inet.h>
35 #endif
36 #include <pthread.h>
37 #include <zmq.h>
38
39 #include "zeromq_borrowed.c"
40
41 struct cmq_socket_s
42 {
43         void *socket;
44         int type;
45 };
46 typedef struct cmq_socket_s cmq_socket_t;
47
48 static int cmq_threads_num = 1;
49 static void *cmq_context = NULL;
50
51 static pthread_t *receive_thread_ids = NULL;
52 static size_t     receive_thread_num = 0;
53 static int        sending_sockets_num = 0;
54
55 static _Bool thread_running = 1;
56
57 static void cmq_close_callback (void *socket) /* {{{ */
58 {
59   if (socket != NULL)
60     (void) zmq_close (socket);
61 } /* }}} void cmq_close_callback */
62
63 static void free_data (void *data, void *hint) /* {{{ */
64 {
65   free (data);
66 } /* }}} void free_data */
67
68 static void *receive_thread (void *cmq_socket) /* {{{ */
69 {
70   int status;
71   char *data = NULL;
72   size_t data_size;
73
74   assert (cmq_socket != NULL);
75
76   while (thread_running)
77   {
78     zmq_msg_t msg;
79
80     (void) zmq_msg_init (&msg);
81
82     status = zmq_recv (cmq_socket, &msg, /* flags = */ 0);
83     if (status != 0)
84     {
85       if ((errno == EAGAIN) || (errno == EINTR))
86         continue;
87
88       ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno));
89       break;
90     }
91
92     data = zmq_msg_data (&msg);
93     data_size = zmq_msg_size (&msg);
94
95     status = parse_packet (NULL, data, data_size,
96         /* flags = */ 0,
97         /* username = */ NULL);
98     DEBUG("zeromq plugin: received data, parse returned %d", status);
99
100     (void) zmq_msg_close (&msg);
101   } /* while (thread_running) */
102
103   DEBUG ("zeromq plugin: Receive thread is terminating.");
104   (void) zmq_close (cmq_socket);
105   
106   return (NULL);
107 } /* }}} void *receive_thread */
108
109 #define PACKET_SIZE   512
110
111 static int write_notification (const notification_t *n, /* {{{ */
112     __attribute__((unused)) user_data_t *user_data)
113 {
114   char        buffer[PACKET_SIZE];
115   char       *buffer_ptr = buffer;
116   int         buffer_free = sizeof (buffer);
117   int         status;
118   zmq_msg_t   msg;
119   
120   void *cmq_socket = user_data->data;
121
122   memset (buffer, '\0', sizeof (buffer));
123
124
125   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME, (uint64_t) n->time);
126   if (status != 0)
127     return (-1);
128
129   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY, (uint64_t) n->severity);
130   if (status != 0)
131     return (-1);
132
133   if (strlen (n->host) > 0)
134   {
135     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST, n->host, strlen (n->host));
136     if (status != 0)
137       return (-1);
138   }
139
140   if (strlen (n->plugin) > 0)
141   {
142     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN, n->plugin, strlen (n->plugin));
143     if (status != 0)
144       return (-1);
145   }
146
147   if (strlen (n->plugin_instance) > 0)
148   {
149     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN_INSTANCE, n->plugin_instance,
150       strlen (n->plugin_instance));
151     if (status != 0)
152       return (-1);
153   }
154
155   if (strlen (n->type) > 0)
156   {
157     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE, n->type, strlen (n->type));
158     if (status != 0)
159       return (-1);
160   }
161
162   if (strlen (n->type_instance) > 0)
163   {
164     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE, n->type_instance,
165       strlen (n->type_instance));
166     if (status != 0)
167       return (-1);
168   }
169
170   status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE, n->message, strlen (n->message));
171   if (status != 0)
172     return (-1);
173   
174   // zeromq will free the memory when needed by calling the free_data function
175   if( zmq_msg_init_data(&msg, buffer, sizeof(buffer) - buffer_free, free_data, NULL) != 0 ) {
176     ERROR("zmq_msg_init : %s", zmq_strerror(errno));
177     return 1;
178   }
179
180   // try to send the message
181   if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
182     if( errno == EAGAIN ) {
183       WARNING("ZeroMQ: Cannot send message, queue is full");
184     }
185     else {
186       ERROR("zmq_send : %s", zmq_strerror(errno));
187       return 1;
188     }
189   }
190   
191   return 0;
192 } /* }}} int write_notification */
193
194 static int write_value (const data_set_t *ds, /* {{{ */
195     const value_list_t *vl,
196     user_data_t *user_data)
197 {
198   void *cmq_socket = user_data->data;
199
200   zmq_msg_t msg;
201   char      *send_buffer;
202   int       send_buffer_size = PACKET_SIZE, real_size;
203
204   send_buffer = malloc(PACKET_SIZE);
205   if( send_buffer == NULL ) {
206     ERROR("Unable to allocate memory for send_buffer, aborting write");
207     return 1;
208   }
209
210   // empty buffer
211   memset(send_buffer, 0, PACKET_SIZE);
212
213   real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
214
215   // zeromq will free the memory when needed by calling the free_data function
216   if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
217     ERROR("zmq_msg_init : %s", zmq_strerror(errno));
218     return 1;
219   }
220
221   // try to send the message
222   if( zmq_send(cmq_socket, &msg, ZMQ_NOBLOCK) != 0 ) {
223     if( errno == EAGAIN ) {
224       WARNING("ZeroMQ: Unable to queue message, queue may be full");
225       return -1;
226     }
227     else {
228       ERROR("zmq_send : %s", zmq_strerror(errno));
229       return -1;
230     }
231   }
232
233   DEBUG("ZeroMQ: data sent");
234
235   return 0;
236 } /* }}} int write_value */
237
238 static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */
239 {
240   char buffer[64] = "";
241   int status;
242
243   status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
244   if (status != 0)
245     return (-1);
246
247   if (strcasecmp ("Publish", buffer) == 0)
248     return (ZMQ_PUB);
249   else if (strcasecmp ("Subscribe", buffer) == 0)
250     return (ZMQ_SUB);
251   else if (strcasecmp ("Push", buffer) == 0)
252     return (ZMQ_PUSH);
253   else if (strcasecmp ("Pull", buffer) == 0)
254     return (ZMQ_PULL);
255   
256   ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"",
257       buffer);
258   return (-1);
259 } /* }}} int cmq_config_mode */
260
261 static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
262 {
263   int type;
264   int status;
265   int i;
266   int endpoints_num;
267   void *cmq_socket;
268
269   type = cmq_config_mode (ci);
270   if (type < 0)
271     return (-1);
272
273   if (cmq_context == NULL)
274   {
275     cmq_context = zmq_init (cmq_threads_num);
276     if (cmq_context == NULL)
277     {
278       ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s",
279           zmq_strerror (errno));
280       return (-1);
281     }
282     
283     INFO("ZeroMQ: Using %d threads", cmq_threads_num);
284   }
285
286   /* Create a new socket */
287   cmq_socket = zmq_socket (cmq_context, type);
288   if (cmq_socket == NULL)
289   {
290     ERROR ("zeromq plugin: zmq_socket failed: %s",
291         zmq_strerror (errno));
292     return (-1);
293   }
294
295   if (type == ZMQ_SUB)
296   {
297     /* Subscribe to all messages */
298     status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE,
299         /* prefix = */ "", /* prefix length = */ 0);
300     if (status != 0)
301     {
302       ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s",
303           zmq_strerror (errno));
304       (void) zmq_close (cmq_socket);
305       return (-1);
306     }
307   }
308
309   /* Iterate over all children and do all the binds and connects requested. */
310   endpoints_num = 0;
311   for (i = 0; i < ci->children_num; i++)
312   {
313     oconfig_item_t *child = ci->children + i;
314
315     if (strcasecmp ("Endpoint", child->key) == 0)
316     {
317       char *value = NULL;
318
319       status = cf_util_get_string (child, &value);
320       if (status != 0)
321         continue;
322
323       if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
324       {
325         DEBUG("Binding to %s", value);
326         status = zmq_bind (cmq_socket, value);
327         if (status != 0)
328         {
329           ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
330               value, zmq_strerror (errno));
331           sfree (value);
332           continue;
333         }
334       }
335       else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
336       {
337         DEBUG("Connecting to %s", value);
338         status = zmq_connect (cmq_socket, value);
339         if (status != 0)
340         {
341           ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s",
342               value, zmq_strerror (errno));
343           sfree (value);
344           continue;
345         }
346       }
347       else
348       {
349         assert (23 == 42);
350       }
351       
352       sfree (value);
353
354       endpoints_num++;
355       continue;
356     } /* Endpoint */
357     else if( strcasecmp("HWM", child->key) == 0 )
358     {
359       int tmp;
360       uint64_t hwm;
361       
362       status = cf_util_get_int(child, &tmp);
363       if( status != 0 )
364         continue;
365       
366       hwm = (uint64_t) tmp;
367       
368       status = zmq_setsockopt (cmq_socket, ZMQ_HWM, &hwm, sizeof(hwm));
369       if (status != 0) {
370         ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_HWM) failed: %s", zmq_strerror (errno));
371         (void) zmq_close (cmq_socket);
372         return (-1);
373       }
374       
375       continue;
376     } /* HWM */
377     else
378     {
379       ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
380           child->key);
381     }
382   } /* for (i = 0; i < ci->children_num; i++) */
383
384   if (endpoints_num == 0)
385   {
386     ERROR ("zeromq plugin: No (valid) \"Endpoint\" option was found in this "
387         "\"Socket\" block.");
388     (void) zmq_close (cmq_socket);
389     return (-1);
390   }
391
392   /* If this is a receiving socket, create a new receive thread */
393   if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
394   {
395     pthread_t *thread_ptr;
396
397     thread_ptr = realloc (receive_thread_ids,
398         sizeof (*receive_thread_ids) * (receive_thread_num + 1));
399     if (thread_ptr == NULL)
400     {
401       ERROR ("zeromq plugin: realloc failed.");
402       return (-1);
403     }
404     receive_thread_ids = thread_ptr;
405     thread_ptr = receive_thread_ids + receive_thread_num;
406
407     status = pthread_create (thread_ptr,
408         /* attr = */ NULL,
409         /* func = */ receive_thread,
410         /* args = */ cmq_socket);
411     if (status != 0)
412     {
413       char errbuf[1024];
414       ERROR ("zeromq plugin: pthread_create failed: %s",
415           sstrerror (errno, errbuf, sizeof (errbuf)));
416       (void) zmq_close (cmq_socket);
417       return (-1);
418     }
419
420     receive_thread_num++;
421   }
422
423   /* If this is a sending socket, register a new write function */
424   else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
425   {
426     user_data_t ud = { NULL, NULL };
427     char name[20];
428
429     ud.data = cmq_socket;
430     ud.free_func = cmq_close_callback;
431
432     ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
433     sending_sockets_num++;
434     
435     plugin_register_write (name, write_value, &ud);
436     
437     ssnprintf (name, sizeof (name), "zeromq/%i/notif", sending_sockets_num);
438     
439     plugin_register_notification (name, write_notification, &ud);
440   }
441
442   return (0);
443 } /* }}} int cmq_config_socket */
444
445 /*
446  * Config schema:
447  *
448  * <Plugin "zeromq">
449  *   Threads 2
450  *
451  *   <Socket Publish>
452  *     HWM 300
453  *     Endpoint "tcp://localhost:6666"
454  *   </Socket>
455  *   <Socket Subscribe>
456  *     Endpoint "tcp://eth0:6666"
457  *     Endpoint "tcp://collectd.example.com:6666"
458  *   </Socket>
459  * </Plugin>
460  */
461 static int cmq_config (oconfig_item_t *ci) /* {{{ */
462 {
463   int status;
464   int i;
465   
466   for (i = 0; i < ci->children_num; i++)
467   {
468     oconfig_item_t *child = ci->children + i;
469
470     if (strcasecmp ("Socket", child->key) == 0)
471       status = cmq_config_socket (child);
472     else if (strcasecmp ("Threads", child->key) == 0)
473     {
474       int tmp = 0;
475       status = cf_util_get_int (child, &tmp);
476       if ((status == 0) && (tmp >= 1))
477         cmq_threads_num = tmp;
478     }
479     else
480     {
481       WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.",
482           child->key);
483     }
484   }
485
486   return (0);
487 } /* }}} int cmq_config */
488
489 static int plugin_init (void) /* {{{ */
490 {
491   int major, minor, patch;
492   zmq_version (&major, &minor, &patch);
493   INFO("ZeroMQ plugin loaded (zeromq v%d.%d.%d).", major, minor, patch);
494   return 0;
495 } /* }}} int plugin_init */
496
497 static int cmq_shutdown (void) /* {{{ */
498 {
499   size_t i;
500
501   if (cmq_context == NULL)
502     return (0);
503
504   /* Signal thread to shut down */
505   thread_running = 0;
506     
507   DEBUG ("ZeroMQ plugin: Waiting for %zu receive thread(s) to shut down.",
508       receive_thread_num);
509     
510   for (i = 0; i < receive_thread_num; i++)
511     pthread_join (receive_thread_ids[i], /* return ptr = */ NULL);
512
513   if (zmq_term (cmq_context) != 0)
514   {
515     ERROR ("ZeroMQ plugin: zmq_term failed: %s", zmq_strerror (errno));
516     return 1;
517   }
518     
519   return 0;
520 } /* }}} int cmq_shutdown */
521
522 void module_register (void)
523 {
524   plugin_register_complex_config("zeromq", cmq_config);
525   plugin_register_init("zeromq", plugin_init);
526   plugin_register_shutdown ("zeromq", cmq_shutdown);
527 }
528
529 /* vim: set sw=2 sts=2 et fdm=marker : */