zeromq plugin: Don't distinguish between "Bind" and "Connect".
[collectd.git] / src / zeromq.c
1 /**
2  * collectd - src/zeromq.c
3  * Copyright (C) 2005-2009  Florian octo Forster
4  * Copyright (C) 2009       Aman Gupta
5  * Copyright (C) 2010       Julien Ammous
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at verplant.org>
22  *   Aman Gupta <aman at tmm1.net>
23  *   Julien Ammous
24  **/
25
26 #include "collectd.h"
27 #include "common.h" /* auxiliary functions */
28 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
29 #include "utils_cache.h"
30 #include "network.h"
31
32 /* for htons() */
33 #if HAVE_ARPA_INET_H
34 # include <arpa/inet.h>
35 #endif
36 #include <pthread.h>
37 #include <zmq.h>
38
39 // copy/paste from network.c ...
40
41 static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
42
43 static uint64_t stats_values_not_dispatched = 0;
44 static uint64_t stats_values_dispatched = 0;
45
46 struct cmq_socket_s
47 {
48         void *socket;
49         int type;
50 };
51 typedef struct cmq_socket_s cmq_socket_t;
52
53 static int cmq_threads_num = 1;
54 static void *cmq_context = NULL;
55
56 static pthread_t *receive_thread_ids = NULL;
57 static size_t     receive_thread_num = 0;
58 static int        sending_sockets_num = 0;
59
60 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
61  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
62  * +-------+-----------------------+-------------------------------+
63  * ! Ver.  !                       ! Length                        !
64  * +-------+-----------------------+-------------------------------+
65  */
66 struct part_header_s
67 {
68         uint16_t type;
69         uint16_t length;
70 };
71 typedef struct part_header_s part_header_t;
72
73 // we do not want to crypt here
74 #undef HAVE_LIBGCRYPT
75
76 static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */
77 {
78   uint64_t time_sent = 0;
79   int status;
80
81   status = uc_meta_data_get_unsigned_int (vl,
82       "zeromq:time_sent", &time_sent);
83
84   /* This is a value we already sent. Don't allow it to be received again in
85    * order to avoid looping. */
86   if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
87     return (0);
88
89   return (1);
90 } /* }}} _Bool check_receive_okay */
91
92 static int network_dispatch_values (value_list_t *vl, /* {{{ */
93     const char *username)
94 {
95   int status;
96   
97   // DEBUG("host: %s", vl->host);
98   // DEBUG("plugin: %s", vl->plugin);
99   // DEBUG("plugin_instance: %s", vl->plugin_instance);
100   // DEBUG("type: %s", vl->type);
101   // DEBUG("type_instance: %s", vl->type_instance);
102
103   if ((vl->time <= 0)
104       || (strlen (vl->host) <= 0)
105       || (strlen (vl->plugin) <= 0)
106       || (strlen (vl->type) <= 0))
107     return (-EINVAL);
108   
109   if (!check_receive_okay (vl))
110   {
111 #if COLLECT_DEBUG
112     char name[6*DATA_MAX_NAME_LEN];
113     FORMAT_VL (name, sizeof (name), vl);
114     name[sizeof (name) - 1] = 0;
115     DEBUG ("network plugin: network_dispatch_values: "
116         "NOT dispatching %s.", name);
117 #endif
118     stats_values_not_dispatched++;
119     return (0);
120   }
121
122   assert (vl->meta == NULL);
123
124   vl->meta = meta_data_create ();
125   if (vl->meta == NULL)
126   {
127     ERROR ("network plugin: meta_data_create failed.");
128     return (-ENOMEM);
129   }
130
131   status = meta_data_add_boolean (vl->meta, "zeromq:received", 1);
132   if (status != 0)
133   {
134     ERROR ("network plugin: meta_data_add_boolean failed.");
135     meta_data_destroy (vl->meta);
136     vl->meta = NULL;
137     return (status);
138   }
139
140   if (username != NULL)
141   {
142     status = meta_data_add_string (vl->meta, "zeromq:username", username);
143     if (status != 0)
144     {
145       ERROR ("network plugin: meta_data_add_string failed.");
146       meta_data_destroy (vl->meta);
147       vl->meta = NULL;
148       return (status);
149     }
150   }
151   
152   // DEBUG("dispatching %d values", vl->values_len);
153   plugin_dispatch_values (vl);
154   stats_values_dispatched++;
155
156   meta_data_destroy (vl->meta);
157   vl->meta = NULL;
158
159   return (0);
160 } /* }}} int network_dispatch_values */
161
162 static int write_part_values (char **ret_buffer, int *ret_buffer_len, const data_set_t *ds, const value_list_t *vl)
163 {
164   char *packet_ptr;
165   int packet_len;
166   int num_values;
167
168   part_header_t pkg_ph;
169   uint16_t      pkg_num_values;
170   uint8_t      *pkg_values_types;
171   value_t      *pkg_values;
172
173   int offset;
174   int i;
175
176   num_values = vl->values_len;
177   packet_len = sizeof (part_header_t) + sizeof (uint16_t)
178     + (num_values * sizeof (uint8_t))
179     + (num_values * sizeof (value_t));
180
181   if (*ret_buffer_len < packet_len)
182     return (-1);
183
184   pkg_values_types = (uint8_t *) malloc (num_values * sizeof (uint8_t));
185   if (pkg_values_types == NULL)
186   {
187     ERROR ("network plugin: write_part_values: malloc failed.");
188     return (-1);
189   }
190
191   pkg_values = (value_t *) malloc (num_values * sizeof (value_t));
192   if (pkg_values == NULL)
193   {
194     free (pkg_values_types);
195     ERROR ("network plugin: write_part_values: malloc failed.");
196     return (-1);
197   }
198
199   pkg_ph.type = htons (TYPE_VALUES);
200   pkg_ph.length = htons (packet_len);
201
202   pkg_num_values = htons ((uint16_t) vl->values_len);
203
204   for (i = 0; i < num_values; i++)
205   {
206     pkg_values_types[i] = (uint8_t) ds->ds[i].type;
207     switch (ds->ds[i].type)
208     {
209       case DS_TYPE_COUNTER:
210         pkg_values[i].counter = htonll (vl->values[i].counter);
211         break;
212
213       case DS_TYPE_GAUGE:
214         pkg_values[i].gauge = htond (vl->values[i].gauge);
215         break;
216
217       case DS_TYPE_DERIVE:
218         pkg_values[i].derive = htonll (vl->values[i].derive);
219         break;
220
221       case DS_TYPE_ABSOLUTE:
222         pkg_values[i].absolute = htonll (vl->values[i].absolute);
223         break;
224
225       default:
226         free (pkg_values_types);
227         free (pkg_values);
228         ERROR ("network plugin: write_part_values: "
229             "Unknown data source type: %i",
230             ds->ds[i].type);
231         return (-1);
232     } /* switch (ds->ds[i].type) */
233   } /* for (num_values) */
234
235   /*
236    * Use `memcpy' to write everything to the buffer, because the pointer
237    * may be unaligned and some architectures, such as SPARC, can't handle
238    * that.
239    */
240   packet_ptr = *ret_buffer;
241   offset = 0;
242   memcpy (packet_ptr + offset, &pkg_ph, sizeof (pkg_ph));
243   offset += sizeof (pkg_ph);
244   memcpy (packet_ptr + offset, &pkg_num_values, sizeof (pkg_num_values));
245   offset += sizeof (pkg_num_values);
246   memcpy (packet_ptr + offset, pkg_values_types, num_values * sizeof (uint8_t));
247   offset += num_values * sizeof (uint8_t);
248   memcpy (packet_ptr + offset, pkg_values, num_values * sizeof (value_t));
249   offset += num_values * sizeof (value_t);
250
251   assert (offset == packet_len);
252
253   *ret_buffer = packet_ptr + packet_len;
254   *ret_buffer_len -= packet_len;
255
256   free (pkg_values_types);
257   free (pkg_values);
258
259   return (0);
260 } /* int write_part_values */
261
262 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
263     int type, uint64_t value)
264 {
265   char *packet_ptr;
266   int packet_len;
267
268   part_header_t pkg_head;
269   uint64_t pkg_value;
270   
271   int offset;
272
273   packet_len = sizeof (pkg_head) + sizeof (pkg_value);
274
275   if (*ret_buffer_len < packet_len)
276     return (-1);
277
278   pkg_head.type = htons (type);
279   pkg_head.length = htons (packet_len);
280   pkg_value = htonll (value);
281
282   packet_ptr = *ret_buffer;
283   offset = 0;
284   memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
285   offset += sizeof (pkg_head);
286   memcpy (packet_ptr + offset, &pkg_value, sizeof (pkg_value));
287   offset += sizeof (pkg_value);
288
289   assert (offset == packet_len);
290
291   *ret_buffer = packet_ptr + packet_len;
292   *ret_buffer_len -= packet_len;
293
294   return (0);
295 } /* int write_part_number */
296
297 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
298     int type, const char *str, int str_len)
299 {
300   char *buffer;
301   int buffer_len;
302
303   uint16_t pkg_type;
304   uint16_t pkg_length;
305
306   int offset;
307
308   buffer_len = 2 * sizeof (uint16_t) + str_len + 1;
309   if (*ret_buffer_len < buffer_len)
310     return (-1);
311
312   pkg_type = htons (type);
313   pkg_length = htons (buffer_len);
314
315   buffer = *ret_buffer;
316   offset = 0;
317   memcpy (buffer + offset, (void *) &pkg_type, sizeof (pkg_type));
318   offset += sizeof (pkg_type);
319   memcpy (buffer + offset, (void *) &pkg_length, sizeof (pkg_length));
320   offset += sizeof (pkg_length);
321   memcpy (buffer + offset, str, str_len);
322   offset += str_len;
323   memset (buffer + offset, '\0', 1);
324   offset += 1;
325
326   assert (offset == buffer_len);
327
328   *ret_buffer = buffer + buffer_len;
329   *ret_buffer_len -= buffer_len;
330
331   return (0);
332 } /* int write_part_string */
333
334 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
335     value_t **ret_values, int *ret_num_values)
336 {
337   char *buffer = *ret_buffer;
338   size_t buffer_len = *ret_buffer_len;
339
340   uint16_t tmp16;
341   size_t exp_size;
342   int   i;
343
344   uint16_t pkg_length;
345   uint16_t pkg_type;
346   uint16_t pkg_numval;
347
348   uint8_t *pkg_types;
349   value_t *pkg_values;
350
351   if (buffer_len < 15)
352   {
353     NOTICE ("network plugin: packet is too short: "
354         "buffer_len = %zu", buffer_len);
355     return (-1);
356   }
357
358   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
359   buffer += sizeof (tmp16);
360   pkg_type = ntohs (tmp16);
361
362   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
363   buffer += sizeof (tmp16);
364   pkg_length = ntohs (tmp16);
365
366   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
367   buffer += sizeof (tmp16);
368   pkg_numval = ntohs (tmp16);
369
370   assert (pkg_type == TYPE_VALUES);
371
372   exp_size = 3 * sizeof (uint16_t)
373     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
374   if ((buffer_len < 0) || (buffer_len < exp_size))
375   {
376     WARNING ("network plugin: parse_part_values: "
377         "Packet too short: "
378         "Chunk of size %zu expected, "
379         "but buffer has only %zu bytes left.",
380         exp_size, buffer_len);
381     return (-1);
382   }
383
384   if (pkg_length != exp_size)
385   {
386     WARNING ("network plugin: parse_part_values: "
387         "Length and number of values "
388         "in the packet don't match.");
389     return (-1);
390   }
391
392   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
393   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
394   if ((pkg_types == NULL) || (pkg_values == NULL))
395   {
396     sfree (pkg_types);
397     sfree (pkg_values);
398     ERROR ("network plugin: parse_part_values: malloc failed.");
399     return (-1);
400   }
401
402   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
403   buffer += pkg_numval * sizeof (uint8_t);
404   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
405   buffer += pkg_numval * sizeof (value_t);
406
407   for (i = 0; i < pkg_numval; i++)
408   {
409     switch (pkg_types[i])
410     {
411       case DS_TYPE_COUNTER:
412         pkg_values[i].counter = (counter_t) ntohll (pkg_values[i].counter);
413         break;
414
415       case DS_TYPE_GAUGE:
416         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
417         break;
418
419       case DS_TYPE_DERIVE:
420         pkg_values[i].derive = (derive_t) ntohll (pkg_values[i].derive);
421         break;
422
423       case DS_TYPE_ABSOLUTE:
424         pkg_values[i].absolute = (absolute_t) ntohll (pkg_values[i].absolute);
425         break;
426
427       default:
428         NOTICE ("network plugin: parse_part_values: "
429       "Don't know how to handle data source type %"PRIu8,
430       pkg_types[i]);
431         sfree (pkg_types);
432         sfree (pkg_values);
433         return (-1);
434     } /* switch (pkg_types[i]) */
435   }
436
437   *ret_buffer     = buffer;
438   *ret_buffer_len = buffer_len - pkg_length;
439   *ret_num_values = pkg_numval;
440   *ret_values     = pkg_values;
441
442   sfree (pkg_types);
443
444   return (0);
445 } /* int parse_part_values */
446
447 static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */
448     value_list_t *vl_def,
449     const data_set_t *ds, const value_list_t *vl)
450 {
451   char *buffer_orig = buffer;
452   
453   if (write_part_string (&buffer, &buffer_size, TYPE_HOST, vl->host, strlen (vl->host)) != 0)
454     return (-1);
455   
456   if (write_part_number (&buffer, &buffer_size, TYPE_TIME, (uint64_t) vl->time))
457     return (-1);
458   
459   if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL, (uint64_t) vl->interval))
460     return (-1);
461     
462   if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, vl->plugin, strlen (vl->plugin)) != 0)
463     return (-1);
464   
465   if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0)
466     return (-1);
467   
468   if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, vl->type, strlen (vl->type)) != 0)
469     return (-1);
470   
471   if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0)
472     return (-1);
473   
474   if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
475     return (-1);
476
477   return (buffer - buffer_orig);
478 } /* }}} int add_to_buffer */
479
480 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
481     uint64_t *value)
482 {
483   char *buffer = *ret_buffer;
484   size_t buffer_len = *ret_buffer_len;
485
486   uint16_t tmp16;
487   uint64_t tmp64;
488   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
489
490   uint16_t pkg_length;
491   uint16_t pkg_type;
492
493   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
494   {
495     WARNING ("network plugin: parse_part_number: "
496         "Packet too short: "
497         "Chunk of size %zu expected, "
498         "but buffer has only %zu bytes left.",
499         exp_size, buffer_len);
500     return (-1);
501   }
502
503   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
504   buffer += sizeof (tmp16);
505   pkg_type = ntohs (tmp16);
506
507   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
508   buffer += sizeof (tmp16);
509   pkg_length = ntohs (tmp16);
510
511   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
512   buffer += sizeof (tmp64);
513   *value = ntohll (tmp64);
514
515   *ret_buffer = buffer;
516   *ret_buffer_len = buffer_len - pkg_length;
517
518   return (0);
519 } /* int parse_part_number */
520
521 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
522     char *output, int output_len)
523 {
524   char *buffer = *ret_buffer;
525   size_t buffer_len = *ret_buffer_len;
526
527   uint16_t tmp16;
528   size_t header_size = 2 * sizeof (uint16_t);
529
530   uint16_t pkg_length;
531   uint16_t pkg_type;
532
533   if ((buffer_len < 0) || (buffer_len < header_size))
534   {
535     WARNING ("network plugin: parse_part_string: "
536         "Packet too short: "
537         "Chunk of at least size %zu expected, "
538         "but buffer has only %zu bytes left.",
539         header_size, buffer_len);
540     return (-1);
541   }
542
543   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
544   buffer += sizeof (tmp16);
545   pkg_type = ntohs (tmp16);
546
547   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
548   buffer += sizeof (tmp16);
549   pkg_length = ntohs (tmp16);
550
551   /* Check that packet fits in the input buffer */
552   if (pkg_length > buffer_len)
553   {
554     WARNING ("network plugin: parse_part_string: "
555         "Packet too big: "
556         "Chunk of size %"PRIu16" received, "
557         "but buffer has only %zu bytes left.",
558         pkg_length, buffer_len);
559     return (-1);
560   }
561
562   /* Check that pkg_length is in the valid range */
563   if (pkg_length <= header_size)
564   {
565     WARNING ("network plugin: parse_part_string: "
566         "Packet too short: "
567         "Header claims this packet is only %hu "
568         "bytes long.", pkg_length);
569     return (-1);
570   }
571
572   /* Check that the package data fits into the output buffer.
573    * The previous if-statement ensures that:
574    * `pkg_length > header_size' */
575   if ((output_len < 0)
576       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
577   {
578     WARNING ("network plugin: parse_part_string: "
579         "Output buffer too small.");
580     return (-1);
581   }
582
583   /* All sanity checks successfull, let's copy the data over */
584   output_len = pkg_length - header_size;
585   memcpy ((void *) output, (void *) buffer, output_len);
586   buffer += output_len;
587
588   /* For some very weird reason '\0' doesn't do the trick on SPARC in
589    * this statement. */
590   if (output[output_len - 1] != 0)
591   {
592     WARNING ("network plugin: parse_part_string: "
593         "Received string does not end "
594         "with a NULL-byte.");
595     return (-1);
596   }
597
598   *ret_buffer = buffer;
599   *ret_buffer_len = buffer_len - pkg_length;
600
601   return (0);
602 } /* int parse_part_string */
603
604
605 static int parse_packet (void *se, /* {{{ */
606     void *buffer, size_t buffer_size, int flags,
607     const char *username)
608 {
609   int status;
610
611   value_list_t vl = VALUE_LIST_INIT;
612   notification_t n;
613
614 #if HAVE_LIBGCRYPT
615   int packet_was_signed = (flags & PP_SIGNED);
616         int packet_was_encrypted = (flags & PP_ENCRYPTED);
617   int printed_ignore_warning = 0;
618 #endif /* HAVE_LIBGCRYPT */
619
620
621   memset (&vl, '\0', sizeof (vl));
622   memset (&n, '\0', sizeof (n));
623   status = 0;
624
625   while ((status == 0) && (0 < buffer_size)
626       && ((unsigned int) buffer_size > sizeof (part_header_t)))
627   {
628     uint16_t pkg_length;
629     uint16_t pkg_type;
630
631     memcpy ((void *) &pkg_type,
632         (void *) buffer,
633         sizeof (pkg_type));
634     memcpy ((void *) &pkg_length,
635         (void *) (buffer + sizeof (pkg_type)),
636         sizeof (pkg_length));
637
638     pkg_length = ntohs (pkg_length);
639     pkg_type = ntohs (pkg_type);
640
641     if (pkg_length > buffer_size)
642       break;
643     /* Ensure that this loop terminates eventually */
644     if (pkg_length < (2 * sizeof (uint16_t)))
645       break;
646
647     if (pkg_type == TYPE_ENCR_AES256)
648     {
649       // status = parse_part_encr_aes256 (se,
650       //     &buffer, &buffer_size, flags);
651       // if (status != 0)
652       // {
653         ERROR ("network plugin: Decrypting AES256 "
654             "part failed "
655             "with status %i.", status);
656         break;
657       // }
658     }
659 #if HAVE_LIBGCRYPT
660     else if ((se->data.server.security_level == SECURITY_LEVEL_ENCRYPT)
661         && (packet_was_encrypted == 0))
662     {
663       if (printed_ignore_warning == 0)
664       {
665         INFO ("network plugin: Unencrypted packet or "
666             "part has been ignored.");
667         printed_ignore_warning = 1;
668       }
669       buffer = ((char *) buffer) + pkg_length;
670       continue;
671     }
672 #endif /* HAVE_LIBGCRYPT */
673     else if (pkg_type == TYPE_SIGN_SHA256)
674     {
675       // status = parse_part_sign_sha256 (se,
676       //                                   &buffer, &buffer_size, flags);
677       // if (status != 0)
678       // {
679         ERROR ("network plugin: Verifying HMAC-SHA-256 "
680             "signature failed "
681             "with status %i.", status);
682         break;
683       // }
684     }
685 #if HAVE_LIBGCRYPT
686     else if ((se->data.server.security_level == SECURITY_LEVEL_SIGN)
687         && (packet_was_encrypted == 0)
688         && (packet_was_signed == 0))
689     {
690       if (printed_ignore_warning == 0)
691       {
692         INFO ("network plugin: Unsigned packet or "
693             "part has been ignored.");
694         printed_ignore_warning = 1;
695       }
696       buffer = ((char *) buffer) + pkg_length;
697       continue;
698     }
699 #endif /* HAVE_LIBGCRYPT */
700     else if (pkg_type == TYPE_VALUES)
701     {
702       status = parse_part_values (&buffer, &buffer_size,
703           &vl.values, &vl.values_len);
704       if (status != 0)
705         break;
706
707       network_dispatch_values (&vl, username);
708
709       sfree (vl.values);
710     }
711     else if (pkg_type == TYPE_TIME)
712     {
713       uint64_t tmp = 0;
714       status = parse_part_number (&buffer, &buffer_size,
715           &tmp);
716       if (status == 0)
717       {
718         vl.time = (time_t) tmp;
719         n.time = (time_t) tmp;
720       }
721     }
722     else if (pkg_type == TYPE_INTERVAL)
723     {
724       uint64_t tmp = 0;
725       status = parse_part_number (&buffer, &buffer_size,
726           &tmp);
727       if (status == 0)
728         vl.interval = (int) tmp;
729     }
730     else if (pkg_type == TYPE_HOST)
731     {
732       status = parse_part_string (&buffer, &buffer_size,
733           vl.host, sizeof (vl.host));
734       if (status == 0)
735         sstrncpy (n.host, vl.host, sizeof (n.host));
736     }
737     else if (pkg_type == TYPE_PLUGIN)
738     {
739       status = parse_part_string (&buffer, &buffer_size,
740           vl.plugin, sizeof (vl.plugin));
741       if (status == 0)
742         sstrncpy (n.plugin, vl.plugin,
743             sizeof (n.plugin));
744     }
745     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
746     {
747       status = parse_part_string (&buffer, &buffer_size,
748           vl.plugin_instance,
749           sizeof (vl.plugin_instance));
750       if (status == 0)
751         sstrncpy (n.plugin_instance,
752             vl.plugin_instance,
753             sizeof (n.plugin_instance));
754     }
755     else if (pkg_type == TYPE_TYPE)
756     {
757       status = parse_part_string (&buffer, &buffer_size,
758           vl.type, sizeof (vl.type));
759       if (status == 0)
760         sstrncpy (n.type, vl.type, sizeof (n.type));
761     }
762     else if (pkg_type == TYPE_TYPE_INSTANCE)
763     {
764       status = parse_part_string (&buffer, &buffer_size,
765           vl.type_instance,
766           sizeof (vl.type_instance));
767       if (status == 0)
768         sstrncpy (n.type_instance, vl.type_instance,
769             sizeof (n.type_instance));
770     }
771     else if (pkg_type == TYPE_MESSAGE)
772     {
773       status = parse_part_string (&buffer, &buffer_size,
774           n.message, sizeof (n.message));
775
776       if (status != 0)
777       {
778         /* do nothing */
779       }
780       else if ((n.severity != NOTIF_FAILURE)
781           && (n.severity != NOTIF_WARNING)
782           && (n.severity != NOTIF_OKAY))
783       {
784         INFO ("network plugin: "
785             "Ignoring notification with "
786             "unknown severity %i.",
787             n.severity);
788       }
789       else if (n.time <= 0)
790       {
791         INFO ("network plugin: "
792             "Ignoring notification with "
793             "time == 0.");
794       }
795       else if (strlen (n.message) <= 0)
796       {
797         INFO ("network plugin: "
798             "Ignoring notification with "
799             "an empty message.");
800       }
801       else
802       {
803         plugin_dispatch_notification (&n);
804       }
805     }
806     else if (pkg_type == TYPE_SEVERITY)
807     {
808       uint64_t tmp = 0;
809       status = parse_part_number (&buffer, &buffer_size,
810           &tmp);
811       if (status == 0)
812         n.severity = (int) tmp;
813     }
814     else
815     {
816       DEBUG ("network plugin: parse_packet: Unknown part"
817           " type: 0x%04hx", pkg_type);
818       buffer = ((char *) buffer) + pkg_length;
819     }
820   } /* while (buffer_size > sizeof (part_header_t)) */
821
822   if (status == 0 && buffer_size > 0)
823     WARNING ("network plugin: parse_packet: Received truncated "
824         "packet, try increasing `MaxPacketSize'");
825
826   return (status);
827 } /* }}} int parse_packet */
828
829
830
831
832
833
834
835
836 ////////////////////////////
837 //// END OF COPY / PASTE ///
838 ////////////////////////////
839
840
841 // config data
842 static char *zmq_send_to = NULL;
843
844 // private data
845 static int thread_running = 1;
846 static pthread_t listen_thread_id;
847 static void *push_socket = NULL;
848
849 static void cmq_close_callback (void *socket) /* {{{ */
850 {
851   if (socket != NULL)
852     (void) zmq_close (socket);
853 } /* }}} void cmq_close_callback */
854
855 static void free_data (void *data, void *hint) /* {{{ */
856 {
857   free (data);
858 } /* }}} void free_data */
859
860 static void *receive_thread (void *cmq_socket) /* {{{ */
861 {
862   int status;
863   char *data = NULL;
864   size_t data_size;
865
866   assert (cmq_socket != NULL);
867
868   while (thread_running)
869   {
870     zmq_msg_t msg;
871
872     (void) zmq_msg_init (&msg);
873
874     status = zmq_recv (cmq_socket, &msg, /* flags = */ 0);
875     if (status != 0)
876     {
877       if ((errno == EAGAIN) || (errno == EINTR))
878         continue;
879
880       ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno));
881       break;
882     }
883
884     data = zmq_msg_data (&msg);
885     data_size = zmq_msg_size (&msg);
886
887     status = parse_packet (NULL, data, data_size,
888         /* flags = */ 0,
889         /* username = */ NULL);
890     DEBUG("zeromq plugin: received data, parse returned %d", status);
891
892     (void) zmq_msg_close (&msg);
893   } /* while (thread_running) */
894
895   DEBUG ("zeromq plugin: Receive thread is terminating.");
896   (void) zmq_close (cmq_socket);
897   
898   return (NULL);
899 } /* }}} void *receive_thread */
900
901 #define PACKET_SIZE   512
902
903 static int write_value (const data_set_t *ds, /* {{{ */
904     const value_list_t *vl,
905     user_data_t *user_data)
906 {
907   void *cmq_socket = user_data->data;
908
909   zmq_msg_t msg;
910   char      *send_buffer;
911   int       send_buffer_size = PACKET_SIZE, real_size;
912
913   send_buffer = malloc(PACKET_SIZE);
914   if( send_buffer == NULL ) {
915     ERROR("Unable to allocate memory for send_buffer, aborting write");
916     return 1;
917   }
918
919   // empty buffer
920   memset(send_buffer, 0, PACKET_SIZE);
921
922   real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
923
924   // zeromq will free the memory when needed by calling the free_data function
925   if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
926     ERROR("zmq_msg_init : %s", zmq_strerror(errno));
927     return 1;
928   }
929
930   // try to send the message
931   if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
932     if( errno == EAGAIN ) {
933       WARNING("ZeroMQ: Cannot send message, queue is full");
934     }
935     else {
936       ERROR("zmq_send : %s", zmq_strerror(errno));
937       return 1;
938     }
939   }
940
941   DEBUG("ZeroMQ: data sent");
942
943   return 0;
944 } /* }}} int write_value */
945
946 static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */
947 {
948   char buffer[64] = "";
949   int status;
950
951   status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
952   if (status != 0)
953     return (-1);
954
955   if (strcasecmp ("Publish", buffer) == 0)
956     return (ZMQ_PUB);
957   else if (strcasecmp ("Subscribe", buffer) == 0)
958     return (ZMQ_SUB);
959   else if (strcasecmp ("Push", buffer) == 0)
960     return (ZMQ_PUSH);
961   else if (strcasecmp ("Pull", buffer) == 0)
962     return (ZMQ_PULL);
963   
964   ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"",
965       buffer);
966   return (-1);
967 } /* }}} int cmq_config_mode */
968
969 static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
970 {
971   int type;
972   int status;
973   int i;
974   int endpoints_num;
975   void *cmq_socket;
976
977   type = cmq_config_mode (ci);
978   if (type < 0)
979     return (-1);
980
981   if (cmq_context == NULL)
982   {
983     cmq_context = zmq_init (cmq_threads_num);
984     if (cmq_context == NULL)
985     {
986       ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s",
987           zmq_strerror (errno));
988       return (-1);
989     }
990   }
991
992   /* Create a new socket */
993   cmq_socket = zmq_socket (cmq_context, type);
994   if (cmq_socket == NULL)
995   {
996     ERROR ("zeromq plugin: zmq_socket failed: %s",
997         zmq_strerror (errno));
998     return (-1);
999   }
1000
1001   if (type == ZMQ_SUB)
1002   {
1003     /* Subscribe to all messages */
1004     status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE,
1005         /* prefix = */ "", /* prefix length = */ 0);
1006     if (status != 0)
1007     {
1008       ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s",
1009           zmq_strerror (errno));
1010       (void) zmq_close (cmq_socket);
1011       return (-1);
1012     }
1013   }
1014
1015   /* Iterate over all children and do all the binds and connects requested. */
1016   endpoints_num = 0;
1017   for (i = 0; i < ci->children_num; i++)
1018   {
1019     oconfig_item_t *child = ci->children + i;
1020
1021     if (strcasecmp ("Endpoint", child->key) == 0)
1022     {
1023       char *value = NULL;
1024
1025       status = cf_util_get_string (child, &value);
1026       if (status != 0)
1027         continue;
1028       sfree (value);
1029
1030       if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
1031       {
1032         status = zmq_bind (cmq_socket, value);
1033         if (status != 0)
1034         {
1035           ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
1036               value, zmq_strerror (errno));
1037           continue;
1038         }
1039       }
1040       else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
1041       {
1042         status = zmq_connect (cmq_socket, value);
1043         if (status != 0)
1044         {
1045           ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s",
1046               value, zmq_strerror (errno));
1047           continue;
1048         }
1049       }
1050       else
1051       {
1052         assert (23 == 42);
1053       }
1054
1055       endpoints_num++;
1056       continue;
1057     } /* Endpoint */
1058     else
1059     {
1060       ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
1061           child->key);
1062     }
1063   } /* for (i = 0; i < ci->children_num; i++) */
1064
1065   if (endpoints_num == 0)
1066   {
1067     ERROR ("zeromq plugin: No (valid) \"Endpoint\" option was found in this "
1068         "\"Socket\" block.");
1069     (void) zmq_close (cmq_socket);
1070     return (-1);
1071   }
1072
1073   /* If this is a receiving socket, create a new receive thread */
1074   if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
1075   {
1076     pthread_t *thread_ptr;
1077
1078     thread_ptr = realloc (receive_thread_ids,
1079         sizeof (*receive_thread_ids) * (receive_thread_num + 1));
1080     if (thread_ptr == NULL)
1081     {
1082       ERROR ("zeromq plugin: realloc failed.");
1083       return (-1);
1084     }
1085     receive_thread_ids = thread_ptr;
1086     thread_ptr = receive_thread_ids + receive_thread_num;
1087
1088     status = pthread_create (thread_ptr,
1089         /* attr = */ NULL,
1090         /* func = */ receive_thread,
1091         /* args = */ cmq_socket);
1092     if (status != 0)
1093     {
1094       char errbuf[1024];
1095       ERROR ("zeromq plugin: pthread_create failed: %s",
1096           sstrerror (errno, errbuf, sizeof (errbuf)));
1097       (void) zmq_close (cmq_socket);
1098       return (-1);
1099     }
1100
1101     receive_thread_num++;
1102   }
1103
1104   /* If this is a sending socket, register a new write function */
1105   else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
1106   {
1107     user_data_t ud = { NULL, NULL };
1108     char name[32];
1109
1110     ud.data = cmq_socket;
1111     ud.free_func = cmq_close_callback;
1112
1113     ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
1114     sending_sockets_num++;
1115
1116     plugin_register_write (name, write_value, &ud);
1117   }
1118
1119   return (0);
1120 } /* }}} int cmq_config_socket */
1121
1122 /*
1123  * Config schema:
1124  *
1125  * <Plugin "zeromq">
1126  *   <Socket Publish>
1127  *     Endpoint "tcp://localhost:6666"
1128  *   </Socket>
1129  *   <Socket Subscribe>
1130  *     Endpoint "tcp://eth0:6666"
1131  *     Endpoint "tcp://collectd.example.com:6666"
1132  *   </Socket>
1133  * </Plugin>
1134  */
1135 static int cmq_config (oconfig_item_t *ci) /* {{{ */
1136 {
1137   int status;
1138   int i;
1139
1140   for (i = 0; i < ci->children_num; i++)
1141   {
1142     oconfig_item_t *child = ci->children + i;
1143
1144     if (strcasecmp ("Socket", child->key) == 0)
1145       status = cmq_config_socket (child);
1146     else if (strcasecmp ("Threads", child->key) == 0)
1147     {
1148       int tmp = 0;
1149       status = cf_util_get_int (child, &tmp);
1150       if ((status == 0) && (tmp >= 1))
1151         cmq_threads_num = tmp;
1152     }
1153     else
1154     {
1155       WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.",
1156           child->key);
1157     }
1158   }
1159
1160   return (0);
1161 } /* }}} int cmq_config */
1162
1163 static int plugin_init (void)
1164 {
1165   int major, minor, patch;
1166   zmq_version (&major, &minor, &patch);
1167   
1168   /* init zeromq (1 I/O thread) */
1169   if (cmq_context == NULL)
1170     cmq_context = zmq_init(1);
1171
1172   if( cmq_context == NULL ) {
1173     ERROR("zmq_init : %s", zmq_strerror(errno));
1174     return 1;
1175   }
1176   
1177   // start send socket
1178   if( zmq_send_to != NULL ) {
1179     push_socket = zmq_socket(cmq_context, ZMQ_PUSH);
1180     
1181     if( push_socket == NULL ) {
1182       ERROR("zmq_socket : %s", zmq_strerror(errno));
1183       return 1;
1184     }
1185     
1186     // and connect to remote host
1187     if( zmq_connect(push_socket, zmq_send_to) != 0 ) {
1188       ERROR("zmq_connect : %s", zmq_strerror(errno));
1189       return 1;
1190     }
1191     
1192     INFO("ZeroMQ pushing to %s", zmq_send_to);
1193   }
1194   
1195   
1196   
1197   INFO("ZeroMQ plugin initialized (zeromq v%d.%d.%d).", major, minor, patch);
1198   return 0;
1199 }
1200
1201
1202
1203 static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data)
1204 {
1205   DEBUG("ZeroMQ: received notification, not implemented yet");
1206   return 0;
1207 }
1208
1209 static int my_shutdown (void)
1210 {
1211   if( cmq_context ) {
1212     
1213     thread_running = 0;
1214     
1215     DEBUG("ZeroMQ: shutting down");
1216     
1217     if( zmq_term(cmq_context) != 0 ) {
1218       ERROR("zmq_term : %s", zmq_strerror(errno));
1219       return 1;
1220     }
1221     
1222     pthread_join(listen_thread_id, NULL);
1223   }
1224   
1225   return 0;
1226 }
1227
1228 void module_register (void)
1229 {
1230   plugin_register_complex_config("zeromq", cmq_config);
1231   plugin_register_init("zeromq", plugin_init);
1232   plugin_register_write("zeromq", write_value,
1233       /* user_data = */ NULL);
1234   plugin_register_notification ("network", write_notification,
1235       /* user_data = */ NULL);
1236   plugin_register_shutdown ("zeromq", my_shutdown);
1237 }