8397c00803c8f7f214adb6edb697ad1f9b2c024f
[collectd.git] / src / zeromq.c
1 /**
2  * collectd - src/zeromq.c
3  * Copyright (C) 2005-2010  Florian octo Forster
4  * Copyright (C) 2009       Aman Gupta
5  * Copyright (C) 2010       Julien Ammous
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at verplant.org>
22  *   Aman Gupta <aman at tmm1.net>
23  *   Julien Ammous
24  **/
25
26 #include "collectd.h"
27 #include "common.h" /* auxiliary functions */
28 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
29 #include "utils_cache.h"
30 #include "network.h"
31
32 /* for htons() */
33 #if HAVE_ARPA_INET_H
34 # include <arpa/inet.h>
35 #endif
36 #include <pthread.h>
37 #include <zmq.h>
38
39 // copy/paste from network.c ...
40
41 static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
42
43 static uint64_t stats_values_not_dispatched = 0;
44 static uint64_t stats_values_dispatched = 0;
45
46 struct cmq_socket_s
47 {
48         void *socket;
49         int type;
50 };
51 typedef struct cmq_socket_s cmq_socket_t;
52
53 static int cmq_threads_num = 1;
54 static void *cmq_context = NULL;
55
56 static pthread_t *receive_thread_ids = NULL;
57 static size_t     receive_thread_num = 0;
58 static int        sending_sockets_num = 0;
59
60 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
61  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
62  * +-------+-----------------------+-------------------------------+
63  * ! Ver.  !                       ! Length                        !
64  * +-------+-----------------------+-------------------------------+
65  */
66 struct part_header_s
67 {
68         uint16_t type;
69         uint16_t length;
70 };
71 typedef struct part_header_s part_header_t;
72
73 // we do not want to crypt here
74 #undef HAVE_LIBGCRYPT
75
76 static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */
77 {
78   uint64_t time_sent = 0;
79   int status;
80
81   status = uc_meta_data_get_unsigned_int (vl,
82       "zeromq:time_sent", &time_sent);
83
84   /* This is a value we already sent. Don't allow it to be received again in
85    * order to avoid looping. */
86   if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
87     return (0);
88
89   return (1);
90 } /* }}} _Bool check_receive_okay */
91
92 static int network_dispatch_values (value_list_t *vl, /* {{{ */
93     const char *username)
94 {
95   int status;
96   
97   // DEBUG("host: %s", vl->host);
98   // DEBUG("plugin: %s", vl->plugin);
99   // DEBUG("plugin_instance: %s", vl->plugin_instance);
100   // DEBUG("type: %s", vl->type);
101   // DEBUG("type_instance: %s", vl->type_instance);
102
103   if ((vl->time <= 0)
104       || (strlen (vl->host) <= 0)
105       || (strlen (vl->plugin) <= 0)
106       || (strlen (vl->type) <= 0))
107     return (-EINVAL);
108   
109   if (!check_receive_okay (vl))
110   {
111 #if COLLECT_DEBUG
112     char name[6*DATA_MAX_NAME_LEN];
113     FORMAT_VL (name, sizeof (name), vl);
114     name[sizeof (name) - 1] = 0;
115     DEBUG ("network plugin: network_dispatch_values: "
116         "NOT dispatching %s.", name);
117 #endif
118     stats_values_not_dispatched++;
119     return (0);
120   }
121
122   assert (vl->meta == NULL);
123
124   vl->meta = meta_data_create ();
125   if (vl->meta == NULL)
126   {
127     ERROR ("network plugin: meta_data_create failed.");
128     return (-ENOMEM);
129   }
130
131   status = meta_data_add_boolean (vl->meta, "zeromq:received", 1);
132   if (status != 0)
133   {
134     ERROR ("network plugin: meta_data_add_boolean failed.");
135     meta_data_destroy (vl->meta);
136     vl->meta = NULL;
137     return (status);
138   }
139
140   if (username != NULL)
141   {
142     status = meta_data_add_string (vl->meta, "zeromq:username", username);
143     if (status != 0)
144     {
145       ERROR ("network plugin: meta_data_add_string failed.");
146       meta_data_destroy (vl->meta);
147       vl->meta = NULL;
148       return (status);
149     }
150   }
151   
152   // DEBUG("dispatching %d values", vl->values_len);
153   plugin_dispatch_values (vl);
154   stats_values_dispatched++;
155
156   meta_data_destroy (vl->meta);
157   vl->meta = NULL;
158
159   return (0);
160 } /* }}} int network_dispatch_values */
161
162 static int write_part_values (char **ret_buffer, int *ret_buffer_len, const data_set_t *ds, const value_list_t *vl)
163 {
164   char *packet_ptr;
165   int packet_len;
166   int num_values;
167
168   part_header_t pkg_ph;
169   uint16_t      pkg_num_values;
170   uint8_t      *pkg_values_types;
171   value_t      *pkg_values;
172
173   int offset;
174   int i;
175
176   num_values = vl->values_len;
177   packet_len = sizeof (part_header_t) + sizeof (uint16_t)
178     + (num_values * sizeof (uint8_t))
179     + (num_values * sizeof (value_t));
180
181   if (*ret_buffer_len < packet_len)
182     return (-1);
183
184   pkg_values_types = (uint8_t *) malloc (num_values * sizeof (uint8_t));
185   if (pkg_values_types == NULL)
186   {
187     ERROR ("network plugin: write_part_values: malloc failed.");
188     return (-1);
189   }
190
191   pkg_values = (value_t *) malloc (num_values * sizeof (value_t));
192   if (pkg_values == NULL)
193   {
194     free (pkg_values_types);
195     ERROR ("network plugin: write_part_values: malloc failed.");
196     return (-1);
197   }
198
199   pkg_ph.type = htons (TYPE_VALUES);
200   pkg_ph.length = htons (packet_len);
201
202   pkg_num_values = htons ((uint16_t) vl->values_len);
203
204   for (i = 0; i < num_values; i++)
205   {
206     pkg_values_types[i] = (uint8_t) ds->ds[i].type;
207     switch (ds->ds[i].type)
208     {
209       case DS_TYPE_COUNTER:
210         pkg_values[i].counter = htonll (vl->values[i].counter);
211         break;
212
213       case DS_TYPE_GAUGE:
214         pkg_values[i].gauge = htond (vl->values[i].gauge);
215         break;
216
217       case DS_TYPE_DERIVE:
218         pkg_values[i].derive = htonll (vl->values[i].derive);
219         break;
220
221       case DS_TYPE_ABSOLUTE:
222         pkg_values[i].absolute = htonll (vl->values[i].absolute);
223         break;
224
225       default:
226         free (pkg_values_types);
227         free (pkg_values);
228         ERROR ("network plugin: write_part_values: "
229             "Unknown data source type: %i",
230             ds->ds[i].type);
231         return (-1);
232     } /* switch (ds->ds[i].type) */
233   } /* for (num_values) */
234
235   /*
236    * Use `memcpy' to write everything to the buffer, because the pointer
237    * may be unaligned and some architectures, such as SPARC, can't handle
238    * that.
239    */
240   packet_ptr = *ret_buffer;
241   offset = 0;
242   memcpy (packet_ptr + offset, &pkg_ph, sizeof (pkg_ph));
243   offset += sizeof (pkg_ph);
244   memcpy (packet_ptr + offset, &pkg_num_values, sizeof (pkg_num_values));
245   offset += sizeof (pkg_num_values);
246   memcpy (packet_ptr + offset, pkg_values_types, num_values * sizeof (uint8_t));
247   offset += num_values * sizeof (uint8_t);
248   memcpy (packet_ptr + offset, pkg_values, num_values * sizeof (value_t));
249   offset += num_values * sizeof (value_t);
250
251   assert (offset == packet_len);
252
253   *ret_buffer = packet_ptr + packet_len;
254   *ret_buffer_len -= packet_len;
255
256   free (pkg_values_types);
257   free (pkg_values);
258
259   return (0);
260 } /* int write_part_values */
261
262 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
263     int type, uint64_t value)
264 {
265   char *packet_ptr;
266   int packet_len;
267
268   part_header_t pkg_head;
269   uint64_t pkg_value;
270   
271   int offset;
272
273   packet_len = sizeof (pkg_head) + sizeof (pkg_value);
274
275   if (*ret_buffer_len < packet_len)
276     return (-1);
277
278   pkg_head.type = htons (type);
279   pkg_head.length = htons (packet_len);
280   pkg_value = htonll (value);
281
282   packet_ptr = *ret_buffer;
283   offset = 0;
284   memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
285   offset += sizeof (pkg_head);
286   memcpy (packet_ptr + offset, &pkg_value, sizeof (pkg_value));
287   offset += sizeof (pkg_value);
288
289   assert (offset == packet_len);
290
291   *ret_buffer = packet_ptr + packet_len;
292   *ret_buffer_len -= packet_len;
293
294   return (0);
295 } /* int write_part_number */
296
297 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
298     int type, const char *str, int str_len)
299 {
300   char *buffer;
301   int buffer_len;
302
303   uint16_t pkg_type;
304   uint16_t pkg_length;
305
306   int offset;
307
308   buffer_len = 2 * sizeof (uint16_t) + str_len + 1;
309   if (*ret_buffer_len < buffer_len)
310     return (-1);
311
312   pkg_type = htons (type);
313   pkg_length = htons (buffer_len);
314
315   buffer = *ret_buffer;
316   offset = 0;
317   memcpy (buffer + offset, (void *) &pkg_type, sizeof (pkg_type));
318   offset += sizeof (pkg_type);
319   memcpy (buffer + offset, (void *) &pkg_length, sizeof (pkg_length));
320   offset += sizeof (pkg_length);
321   memcpy (buffer + offset, str, str_len);
322   offset += str_len;
323   memset (buffer + offset, '\0', 1);
324   offset += 1;
325
326   assert (offset == buffer_len);
327
328   *ret_buffer = buffer + buffer_len;
329   *ret_buffer_len -= buffer_len;
330
331   return (0);
332 } /* int write_part_string */
333
334 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
335     value_t **ret_values, int *ret_num_values)
336 {
337   char *buffer = *ret_buffer;
338   size_t buffer_len = *ret_buffer_len;
339
340   uint16_t tmp16;
341   size_t exp_size;
342   int   i;
343
344   uint16_t pkg_length;
345   uint16_t pkg_type;
346   uint16_t pkg_numval;
347
348   uint8_t *pkg_types;
349   value_t *pkg_values;
350
351   if (buffer_len < 15)
352   {
353     NOTICE ("network plugin: packet is too short: "
354         "buffer_len = %zu", buffer_len);
355     return (-1);
356   }
357
358   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
359   buffer += sizeof (tmp16);
360   pkg_type = ntohs (tmp16);
361
362   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
363   buffer += sizeof (tmp16);
364   pkg_length = ntohs (tmp16);
365
366   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
367   buffer += sizeof (tmp16);
368   pkg_numval = ntohs (tmp16);
369
370   assert (pkg_type == TYPE_VALUES);
371
372   exp_size = 3 * sizeof (uint16_t)
373     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
374   if ((buffer_len < 0) || (buffer_len < exp_size))
375   {
376     WARNING ("network plugin: parse_part_values: "
377         "Packet too short: "
378         "Chunk of size %zu expected, "
379         "but buffer has only %zu bytes left.",
380         exp_size, buffer_len);
381     return (-1);
382   }
383
384   if (pkg_length != exp_size)
385   {
386     WARNING ("network plugin: parse_part_values: "
387         "Length and number of values "
388         "in the packet don't match.");
389     return (-1);
390   }
391
392   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
393   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
394   if ((pkg_types == NULL) || (pkg_values == NULL))
395   {
396     sfree (pkg_types);
397     sfree (pkg_values);
398     ERROR ("network plugin: parse_part_values: malloc failed.");
399     return (-1);
400   }
401
402   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
403   buffer += pkg_numval * sizeof (uint8_t);
404   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
405   buffer += pkg_numval * sizeof (value_t);
406
407   for (i = 0; i < pkg_numval; i++)
408   {
409     switch (pkg_types[i])
410     {
411       case DS_TYPE_COUNTER:
412         pkg_values[i].counter = (counter_t) ntohll (pkg_values[i].counter);
413         break;
414
415       case DS_TYPE_GAUGE:
416         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
417         break;
418
419       case DS_TYPE_DERIVE:
420         pkg_values[i].derive = (derive_t) ntohll (pkg_values[i].derive);
421         break;
422
423       case DS_TYPE_ABSOLUTE:
424         pkg_values[i].absolute = (absolute_t) ntohll (pkg_values[i].absolute);
425         break;
426
427       default:
428         NOTICE ("network plugin: parse_part_values: "
429       "Don't know how to handle data source type %"PRIu8,
430       pkg_types[i]);
431         sfree (pkg_types);
432         sfree (pkg_values);
433         return (-1);
434     } /* switch (pkg_types[i]) */
435   }
436
437   *ret_buffer     = buffer;
438   *ret_buffer_len = buffer_len - pkg_length;
439   *ret_num_values = pkg_numval;
440   *ret_values     = pkg_values;
441
442   sfree (pkg_types);
443
444   return (0);
445 } /* int parse_part_values */
446
447 static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */
448     value_list_t *vl_def,
449     const data_set_t *ds, const value_list_t *vl)
450 {
451   char *buffer_orig = buffer;
452   
453   if (write_part_string (&buffer, &buffer_size, TYPE_HOST, vl->host, strlen (vl->host)) != 0)
454     return (-1);
455   
456   if (write_part_number (&buffer, &buffer_size, TYPE_TIME, (uint64_t) vl->time))
457     return (-1);
458   
459   if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL, (uint64_t) vl->interval))
460     return (-1);
461     
462   if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, vl->plugin, strlen (vl->plugin)) != 0)
463     return (-1);
464   
465   if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0)
466     return (-1);
467   
468   if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, vl->type, strlen (vl->type)) != 0)
469     return (-1);
470   
471   if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0)
472     return (-1);
473   
474   if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
475     return (-1);
476
477   return (buffer - buffer_orig);
478 } /* }}} int add_to_buffer */
479
480 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
481     uint64_t *value)
482 {
483   char *buffer = *ret_buffer;
484   size_t buffer_len = *ret_buffer_len;
485
486   uint16_t tmp16;
487   uint64_t tmp64;
488   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
489
490   uint16_t pkg_length;
491   uint16_t pkg_type;
492
493   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
494   {
495     WARNING ("network plugin: parse_part_number: "
496         "Packet too short: "
497         "Chunk of size %zu expected, "
498         "but buffer has only %zu bytes left.",
499         exp_size, buffer_len);
500     return (-1);
501   }
502
503   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
504   buffer += sizeof (tmp16);
505   pkg_type = ntohs (tmp16);
506
507   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
508   buffer += sizeof (tmp16);
509   pkg_length = ntohs (tmp16);
510
511   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
512   buffer += sizeof (tmp64);
513   *value = ntohll (tmp64);
514
515   *ret_buffer = buffer;
516   *ret_buffer_len = buffer_len - pkg_length;
517
518   return (0);
519 } /* int parse_part_number */
520
521 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
522     char *output, int output_len)
523 {
524   char *buffer = *ret_buffer;
525   size_t buffer_len = *ret_buffer_len;
526
527   uint16_t tmp16;
528   size_t header_size = 2 * sizeof (uint16_t);
529
530   uint16_t pkg_length;
531   uint16_t pkg_type;
532
533   if ((buffer_len < 0) || (buffer_len < header_size))
534   {
535     WARNING ("network plugin: parse_part_string: "
536         "Packet too short: "
537         "Chunk of at least size %zu expected, "
538         "but buffer has only %zu bytes left.",
539         header_size, buffer_len);
540     return (-1);
541   }
542
543   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
544   buffer += sizeof (tmp16);
545   pkg_type = ntohs (tmp16);
546
547   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
548   buffer += sizeof (tmp16);
549   pkg_length = ntohs (tmp16);
550
551   /* Check that packet fits in the input buffer */
552   if (pkg_length > buffer_len)
553   {
554     WARNING ("network plugin: parse_part_string: "
555         "Packet too big: "
556         "Chunk of size %"PRIu16" received, "
557         "but buffer has only %zu bytes left.",
558         pkg_length, buffer_len);
559     return (-1);
560   }
561
562   /* Check that pkg_length is in the valid range */
563   if (pkg_length <= header_size)
564   {
565     WARNING ("network plugin: parse_part_string: "
566         "Packet too short: "
567         "Header claims this packet is only %hu "
568         "bytes long.", pkg_length);
569     return (-1);
570   }
571
572   /* Check that the package data fits into the output buffer.
573    * The previous if-statement ensures that:
574    * `pkg_length > header_size' */
575   if ((output_len < 0)
576       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
577   {
578     WARNING ("network plugin: parse_part_string: "
579         "Output buffer too small.");
580     return (-1);
581   }
582
583   /* All sanity checks successfull, let's copy the data over */
584   output_len = pkg_length - header_size;
585   memcpy ((void *) output, (void *) buffer, output_len);
586   buffer += output_len;
587
588   /* For some very weird reason '\0' doesn't do the trick on SPARC in
589    * this statement. */
590   if (output[output_len - 1] != 0)
591   {
592     WARNING ("network plugin: parse_part_string: "
593         "Received string does not end "
594         "with a NULL-byte.");
595     return (-1);
596   }
597
598   *ret_buffer = buffer;
599   *ret_buffer_len = buffer_len - pkg_length;
600
601   return (0);
602 } /* int parse_part_string */
603
604
605 static int parse_packet (void *se, /* {{{ */
606     void *buffer, size_t buffer_size, int flags,
607     const char *username)
608 {
609   int status;
610
611   value_list_t vl = VALUE_LIST_INIT;
612   notification_t n;
613
614 #if HAVE_LIBGCRYPT
615   int packet_was_signed = (flags & PP_SIGNED);
616         int packet_was_encrypted = (flags & PP_ENCRYPTED);
617   int printed_ignore_warning = 0;
618 #endif /* HAVE_LIBGCRYPT */
619
620
621   memset (&vl, '\0', sizeof (vl));
622   memset (&n, '\0', sizeof (n));
623   status = 0;
624
625   while ((status == 0) && (0 < buffer_size)
626       && ((unsigned int) buffer_size > sizeof (part_header_t)))
627   {
628     uint16_t pkg_length;
629     uint16_t pkg_type;
630
631     memcpy ((void *) &pkg_type,
632         (void *) buffer,
633         sizeof (pkg_type));
634     memcpy ((void *) &pkg_length,
635         (void *) (buffer + sizeof (pkg_type)),
636         sizeof (pkg_length));
637
638     pkg_length = ntohs (pkg_length);
639     pkg_type = ntohs (pkg_type);
640
641     if (pkg_length > buffer_size)
642       break;
643     /* Ensure that this loop terminates eventually */
644     if (pkg_length < (2 * sizeof (uint16_t)))
645       break;
646
647     if (pkg_type == TYPE_ENCR_AES256)
648     {
649       // status = parse_part_encr_aes256 (se,
650       //     &buffer, &buffer_size, flags);
651       // if (status != 0)
652       // {
653         ERROR ("network plugin: Decrypting AES256 "
654             "part failed "
655             "with status %i.", status);
656         break;
657       // }
658     }
659 #if HAVE_LIBGCRYPT
660     else if ((se->data.server.security_level == SECURITY_LEVEL_ENCRYPT)
661         && (packet_was_encrypted == 0))
662     {
663       if (printed_ignore_warning == 0)
664       {
665         INFO ("network plugin: Unencrypted packet or "
666             "part has been ignored.");
667         printed_ignore_warning = 1;
668       }
669       buffer = ((char *) buffer) + pkg_length;
670       continue;
671     }
672 #endif /* HAVE_LIBGCRYPT */
673     else if (pkg_type == TYPE_SIGN_SHA256)
674     {
675       // status = parse_part_sign_sha256 (se,
676       //                                   &buffer, &buffer_size, flags);
677       // if (status != 0)
678       // {
679         ERROR ("network plugin: Verifying HMAC-SHA-256 "
680             "signature failed "
681             "with status %i.", status);
682         break;
683       // }
684     }
685 #if HAVE_LIBGCRYPT
686     else if ((se->data.server.security_level == SECURITY_LEVEL_SIGN)
687         && (packet_was_encrypted == 0)
688         && (packet_was_signed == 0))
689     {
690       if (printed_ignore_warning == 0)
691       {
692         INFO ("network plugin: Unsigned packet or "
693             "part has been ignored.");
694         printed_ignore_warning = 1;
695       }
696       buffer = ((char *) buffer) + pkg_length;
697       continue;
698     }
699 #endif /* HAVE_LIBGCRYPT */
700     else if (pkg_type == TYPE_VALUES)
701     {
702       status = parse_part_values (&buffer, &buffer_size,
703           &vl.values, &vl.values_len);
704       if (status != 0)
705         break;
706
707       network_dispatch_values (&vl, username);
708
709       sfree (vl.values);
710     }
711     else if (pkg_type == TYPE_TIME)
712     {
713       uint64_t tmp = 0;
714       status = parse_part_number (&buffer, &buffer_size,
715           &tmp);
716       if (status == 0)
717       {
718         vl.time = (time_t) tmp;
719         n.time = (time_t) tmp;
720       }
721     }
722     else if (pkg_type == TYPE_INTERVAL)
723     {
724       uint64_t tmp = 0;
725       status = parse_part_number (&buffer, &buffer_size,
726           &tmp);
727       if (status == 0)
728         vl.interval = (int) tmp;
729     }
730     else if (pkg_type == TYPE_HOST)
731     {
732       status = parse_part_string (&buffer, &buffer_size,
733           vl.host, sizeof (vl.host));
734       if (status == 0)
735         sstrncpy (n.host, vl.host, sizeof (n.host));
736     }
737     else if (pkg_type == TYPE_PLUGIN)
738     {
739       status = parse_part_string (&buffer, &buffer_size,
740           vl.plugin, sizeof (vl.plugin));
741       if (status == 0)
742         sstrncpy (n.plugin, vl.plugin,
743             sizeof (n.plugin));
744     }
745     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
746     {
747       status = parse_part_string (&buffer, &buffer_size,
748           vl.plugin_instance,
749           sizeof (vl.plugin_instance));
750       if (status == 0)
751         sstrncpy (n.plugin_instance,
752             vl.plugin_instance,
753             sizeof (n.plugin_instance));
754     }
755     else if (pkg_type == TYPE_TYPE)
756     {
757       status = parse_part_string (&buffer, &buffer_size,
758           vl.type, sizeof (vl.type));
759       if (status == 0)
760         sstrncpy (n.type, vl.type, sizeof (n.type));
761     }
762     else if (pkg_type == TYPE_TYPE_INSTANCE)
763     {
764       status = parse_part_string (&buffer, &buffer_size,
765           vl.type_instance,
766           sizeof (vl.type_instance));
767       if (status == 0)
768         sstrncpy (n.type_instance, vl.type_instance,
769             sizeof (n.type_instance));
770     }
771     else if (pkg_type == TYPE_MESSAGE)
772     {
773       status = parse_part_string (&buffer, &buffer_size,
774           n.message, sizeof (n.message));
775
776       if (status != 0)
777       {
778         /* do nothing */
779       }
780       else if ((n.severity != NOTIF_FAILURE)
781           && (n.severity != NOTIF_WARNING)
782           && (n.severity != NOTIF_OKAY))
783       {
784         INFO ("network plugin: "
785             "Ignoring notification with "
786             "unknown severity %i.",
787             n.severity);
788       }
789       else if (n.time <= 0)
790       {
791         INFO ("network plugin: "
792             "Ignoring notification with "
793             "time == 0.");
794       }
795       else if (strlen (n.message) <= 0)
796       {
797         INFO ("network plugin: "
798             "Ignoring notification with "
799             "an empty message.");
800       }
801       else
802       {
803         plugin_dispatch_notification (&n);
804       }
805     }
806     else if (pkg_type == TYPE_SEVERITY)
807     {
808       uint64_t tmp = 0;
809       status = parse_part_number (&buffer, &buffer_size,
810           &tmp);
811       if (status == 0)
812         n.severity = (int) tmp;
813     }
814     else
815     {
816       DEBUG ("network plugin: parse_packet: Unknown part"
817           " type: 0x%04hx", pkg_type);
818       buffer = ((char *) buffer) + pkg_length;
819     }
820   } /* while (buffer_size > sizeof (part_header_t)) */
821
822   if (status == 0 && buffer_size > 0)
823     WARNING ("network plugin: parse_packet: Received truncated "
824         "packet, try increasing `MaxPacketSize'");
825
826   return (status);
827 } /* }}} int parse_packet */
828
829
830
831
832
833
834
835
836 ////////////////////////////
837 //// END OF COPY / PASTE ///
838 ////////////////////////////
839
840
841 // config data
842 static char *zmq_send_to = NULL;
843
844 // private data
845 static int thread_running = 1;
846 static pthread_t listen_thread_id;
847 static void *push_socket = NULL;
848
849 static void cmq_close_callback (void *socket) /* {{{ */
850 {
851   if (socket != NULL)
852     (void) zmq_close (socket);
853 } /* }}} void cmq_close_callback */
854
855 static void free_data (void *data, void *hint) /* {{{ */
856 {
857   free (data);
858 } /* }}} void free_data */
859
860 static void *receive_thread (void *cmq_socket) /* {{{ */
861 {
862   int status;
863   char *data = NULL;
864   size_t data_size;
865
866   assert (cmq_socket != NULL);
867
868   while (thread_running)
869   {
870     zmq_msg_t msg;
871
872     (void) zmq_msg_init (&msg);
873
874     status = zmq_recv (cmq_socket, &msg, /* flags = */ 0);
875     if (status != 0)
876     {
877       if ((errno == EAGAIN) || (errno == EINTR))
878         continue;
879
880       ERROR ("zeromq plugin: zmq_recv failed: %s", zmq_strerror (errno));
881       break;
882     }
883
884     data = zmq_msg_data (&msg);
885     data_size = zmq_msg_size (&msg);
886
887     status = parse_packet (NULL, data, data_size,
888         /* flags = */ 0,
889         /* username = */ NULL);
890     DEBUG("zeromq plugin: received data, parse returned %d", status);
891
892     (void) zmq_msg_close (&msg);
893   } /* while (thread_running) */
894
895   DEBUG ("zeromq plugin: Receive thread is terminating.");
896   (void) zmq_close (cmq_socket);
897   
898   return (NULL);
899 } /* }}} void *receive_thread */
900
901 #define PACKET_SIZE   512
902
903 static int write_value (const data_set_t *ds, /* {{{ */
904     const value_list_t *vl,
905     user_data_t *user_data)
906 {
907   void *cmq_socket = user_data->data;
908
909   zmq_msg_t msg;
910   char      *send_buffer;
911   int       send_buffer_size = PACKET_SIZE, real_size;
912
913   send_buffer = malloc(PACKET_SIZE);
914   if( send_buffer == NULL ) {
915     ERROR("Unable to allocate memory for send_buffer, aborting write");
916     return 1;
917   }
918
919   // empty buffer
920   memset(send_buffer, 0, PACKET_SIZE);
921
922   real_size = add_to_buffer(send_buffer, send_buffer_size, &send_buffer_vl, ds, vl);
923
924   // zeromq will free the memory when needed by calling the free_data function
925   if( zmq_msg_init_data(&msg, send_buffer, real_size, free_data, NULL) != 0 ) {
926     ERROR("zmq_msg_init : %s", zmq_strerror(errno));
927     return 1;
928   }
929
930   // try to send the message
931   if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
932     if( errno == EAGAIN ) {
933       WARNING("ZeroMQ: Cannot send message, queue is full");
934     }
935     else {
936       ERROR("zmq_send : %s", zmq_strerror(errno));
937       return 1;
938     }
939   }
940
941   DEBUG("ZeroMQ: data sent");
942
943   return 0;
944 } /* }}} int write_value */
945
946 static int cmq_config_mode (oconfig_item_t *ci) /* {{{ */
947 {
948   char buffer[64] = "";
949   int status;
950
951   status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer));
952   if (status != 0)
953     return (-1);
954
955   if (strcasecmp ("Publish", buffer) == 0)
956     return (ZMQ_PUB);
957   else if (strcasecmp ("Subscribe", buffer) == 0)
958     return (ZMQ_SUB);
959   else if (strcasecmp ("Push", buffer) == 0)
960     return (ZMQ_PUSH);
961   else if (strcasecmp ("Pull", buffer) == 0)
962     return (ZMQ_PULL);
963   
964   ERROR ("zeromq plugin: Unrecognized communication pattern: \"%s\"",
965       buffer);
966   return (-1);
967 } /* }}} int cmq_config_mode */
968
969 static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
970 {
971   int type;
972   int status;
973   int i;
974   int endpoints_num;
975   void *cmq_socket;
976
977   type = cmq_config_mode (ci);
978   if (type < 0)
979     return (-1);
980
981   if (cmq_context == NULL)
982   {
983     cmq_context = zmq_init (cmq_threads_num);
984     if (cmq_context == NULL)
985     {
986       ERROR ("zeromq plugin: Initializing ZeroMQ failed: %s",
987           zmq_strerror (errno));
988       return (-1);
989     }
990   }
991
992   /* Create a new socket */
993   cmq_socket = zmq_socket (cmq_context, type);
994   if (cmq_socket == NULL)
995   {
996     ERROR ("zeromq plugin: zmq_socket failed: %s",
997         zmq_strerror (errno));
998     return (-1);
999   }
1000
1001   if (type == ZMQ_SUB)
1002   {
1003     /* Subscribe to all messages */
1004     status = zmq_setsockopt (cmq_socket, ZMQ_SUBSCRIBE,
1005         /* prefix = */ "", /* prefix length = */ 0);
1006     if (status != 0)
1007     {
1008       ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_SUBSCRIBE) failed: %s",
1009           zmq_strerror (errno));
1010       (void) zmq_close (cmq_socket);
1011       return (-1);
1012     }
1013   }
1014
1015   /* Iterate over all children and do all the binds and connects requested. */
1016   endpoints_num = 0;
1017   for (i = 0; i < ci->children_num; i++)
1018   {
1019     oconfig_item_t *child = ci->children + i;
1020
1021     if (strcasecmp ("Endpoint", child->key) == 0)
1022     {
1023       char *value = NULL;
1024
1025       status = cf_util_get_string (child, &value);
1026       if (status != 0)
1027         continue;
1028
1029       if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
1030       {
1031         DEBUG("Binding to %s", value);
1032         status = zmq_bind (cmq_socket, value);
1033         if (status != 0)
1034         {
1035           ERROR ("zeromq plugin: zmq_bind (\"%s\") failed: %s",
1036               value, zmq_strerror (errno));
1037           sfree (value);
1038           continue;
1039         }
1040       }
1041       else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
1042       {
1043         DEBUG("Connecting to %s", value);
1044         status = zmq_connect (cmq_socket, value);
1045         if (status != 0)
1046         {
1047           ERROR ("zeromq plugin: zmq_connect (\"%s\") failed: %s",
1048               value, zmq_strerror (errno));
1049           sfree (value);
1050           continue;
1051         }
1052       }
1053       else
1054       {
1055         assert (23 == 42);
1056       }
1057       
1058       sfree (value);
1059
1060       endpoints_num++;
1061       continue;
1062     } /* Endpoint */
1063     else
1064     {
1065       ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
1066           child->key);
1067     }
1068   } /* for (i = 0; i < ci->children_num; i++) */
1069
1070   if (endpoints_num == 0)
1071   {
1072     ERROR ("zeromq plugin: No (valid) \"Endpoint\" option was found in this "
1073         "\"Socket\" block.");
1074     (void) zmq_close (cmq_socket);
1075     return (-1);
1076   }
1077
1078   /* If this is a receiving socket, create a new receive thread */
1079   if ((type == ZMQ_SUB) || (type == ZMQ_PULL))
1080   {
1081     pthread_t *thread_ptr;
1082
1083     thread_ptr = realloc (receive_thread_ids,
1084         sizeof (*receive_thread_ids) * (receive_thread_num + 1));
1085     if (thread_ptr == NULL)
1086     {
1087       ERROR ("zeromq plugin: realloc failed.");
1088       return (-1);
1089     }
1090     receive_thread_ids = thread_ptr;
1091     thread_ptr = receive_thread_ids + receive_thread_num;
1092
1093     status = pthread_create (thread_ptr,
1094         /* attr = */ NULL,
1095         /* func = */ receive_thread,
1096         /* args = */ cmq_socket);
1097     if (status != 0)
1098     {
1099       char errbuf[1024];
1100       ERROR ("zeromq plugin: pthread_create failed: %s",
1101           sstrerror (errno, errbuf, sizeof (errbuf)));
1102       (void) zmq_close (cmq_socket);
1103       return (-1);
1104     }
1105
1106     receive_thread_num++;
1107   }
1108
1109   /* If this is a sending socket, register a new write function */
1110   else if ((type == ZMQ_PUB) || (type == ZMQ_PUSH))
1111   {
1112     user_data_t ud = { NULL, NULL };
1113     char name[32];
1114
1115     ud.data = cmq_socket;
1116     ud.free_func = cmq_close_callback;
1117
1118     ssnprintf (name, sizeof (name), "zeromq/%i", sending_sockets_num);
1119     sending_sockets_num++;
1120
1121     plugin_register_write (name, write_value, &ud);
1122   }
1123
1124   return (0);
1125 } /* }}} int cmq_config_socket */
1126
1127 /*
1128  * Config schema:
1129  *
1130  * <Plugin "zeromq">
1131  *   <Socket Publish>
1132  *     Endpoint "tcp://localhost:6666"
1133  *   </Socket>
1134  *   <Socket Subscribe>
1135  *     Endpoint "tcp://eth0:6666"
1136  *     Endpoint "tcp://collectd.example.com:6666"
1137  *   </Socket>
1138  * </Plugin>
1139  */
1140 static int cmq_config (oconfig_item_t *ci) /* {{{ */
1141 {
1142   int status;
1143   int i;
1144   
1145   for (i = 0; i < ci->children_num; i++)
1146   {
1147     oconfig_item_t *child = ci->children + i;
1148
1149     if (strcasecmp ("Socket", child->key) == 0)
1150       status = cmq_config_socket (child);
1151     else if (strcasecmp ("Threads", child->key) == 0)
1152     {
1153       int tmp = 0;
1154       status = cf_util_get_int (child, &tmp);
1155       if ((status == 0) && (tmp >= 1))
1156         cmq_threads_num = tmp;
1157     }
1158     else
1159     {
1160       WARNING ("zeromq plugin: The \"%s\" config option is not allowed here.",
1161           child->key);
1162     }
1163   }
1164
1165   return (0);
1166 } /* }}} int cmq_config */
1167
1168 static int plugin_init (void)
1169 {
1170   int major, minor, patch;
1171   zmq_version (&major, &minor, &patch);
1172   
1173   /* init zeromq (1 I/O thread) */
1174   if (cmq_context == NULL)
1175     cmq_context = zmq_init(1);
1176
1177   if( cmq_context == NULL ) {
1178     ERROR("zmq_init : %s", zmq_strerror(errno));
1179     return 1;
1180   }
1181   
1182   // start send socket
1183   if( zmq_send_to != NULL ) {
1184     push_socket = zmq_socket(cmq_context, ZMQ_PUSH);
1185     
1186     if( push_socket == NULL ) {
1187       ERROR("zmq_socket : %s", zmq_strerror(errno));
1188       return 1;
1189     }
1190     
1191     // and connect to remote host
1192     if( zmq_connect(push_socket, zmq_send_to) != 0 ) {
1193       ERROR("zmq_connect : %s", zmq_strerror(errno));
1194       return 1;
1195     }
1196     
1197     INFO("ZeroMQ pushing to %s", zmq_send_to);
1198   }
1199   
1200   
1201   
1202   INFO("ZeroMQ plugin initialized (zeromq v%d.%d.%d).", major, minor, patch);
1203   return 0;
1204 }
1205
1206
1207
1208 static int write_notification (const notification_t *n, user_data_t __attribute__((unused)) *user_data)
1209 {
1210   DEBUG("ZeroMQ: received notification, not implemented yet");
1211   return 0;
1212 }
1213
1214 static int my_shutdown (void)
1215 {
1216   if( cmq_context ) {
1217     
1218     thread_running = 0;
1219     
1220     DEBUG("ZeroMQ: shutting down");
1221     
1222     if( zmq_term(cmq_context) != 0 ) {
1223       ERROR("zmq_term : %s", zmq_strerror(errno));
1224       return 1;
1225     }
1226     
1227     pthread_join(listen_thread_id, NULL);
1228   }
1229   
1230   return 0;
1231 }
1232
1233 void module_register (void)
1234 {
1235   plugin_register_complex_config("zeromq", cmq_config);
1236   plugin_register_init("zeromq", plugin_init);
1237   plugin_register_notification ("network", write_notification,
1238       /* user_data = */ NULL);
1239   plugin_register_shutdown ("zeromq", my_shutdown);
1240 }
1241