Merge branch 'collectd-4.6'
[collectd.git] / src / network.c
1 /**
2  * collectd - src/network.c
3  * Copyright (C) 2005-2008  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27
28 #include "network.h"
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33 #if HAVE_SYS_SOCKET_H
34 # include <sys/socket.h>
35 #endif
36 #if HAVE_NETDB_H
37 # include <netdb.h>
38 #endif
39 #if HAVE_NETINET_IN_H
40 # include <netinet/in.h>
41 #endif
42 #if HAVE_ARPA_INET_H
43 # include <arpa/inet.h>
44 #endif
45 #if HAVE_POLL_H
46 # include <poll.h>
47 #endif
48
49 /* 1500 - 40 - 8  =  Ethernet packet - IPv6 header - UDP header */
50 /* #define BUFF_SIZE 1452 */
51
52 #ifndef IPV6_ADD_MEMBERSHIP
53 # ifdef IPV6_JOIN_GROUP
54 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
55 # else
56 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
57 # endif
58 #endif /* !IP_ADD_MEMBERSHIP */
59
60 #define BUFF_SIZE 1024
61
62 /*
63  * Private data types
64  */
65 typedef struct sockent
66 {
67         int                      fd;
68         struct sockaddr_storage *addr;
69         socklen_t                addrlen;
70         struct sockent          *next;
71 } sockent_t;
72
73 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
74  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
75  * +-------+-----------------------+-------------------------------+
76  * ! Ver.  !                       ! Length                        !
77  * +-------+-----------------------+-------------------------------+
78  */
79 struct part_header_s
80 {
81         uint16_t type;
82         uint16_t length;
83 };
84 typedef struct part_header_s part_header_t;
85
86 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
87  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
88  * +-------------------------------+-------------------------------+
89  * ! Type                          ! Length                        !
90  * +-------------------------------+-------------------------------+
91  * : (Length - 4) Bytes                                            :
92  * +---------------------------------------------------------------+
93  */
94 struct part_string_s
95 {
96         part_header_t *head;
97         char *value;
98 };
99 typedef struct part_string_s part_string_t;
100
101 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
102  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
103  * +-------------------------------+-------------------------------+
104  * ! Type                          ! Length                        !
105  * +-------------------------------+-------------------------------+
106  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
107  * +---------------------------------------------------------------+
108  */
109 struct part_number_s
110 {
111         part_header_t *head;
112         uint64_t *value;
113 };
114 typedef struct part_number_s part_number_t;
115
116 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
117  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
118  * +-------------------------------+-------------------------------+
119  * ! Type                          ! Length                        !
120  * +-------------------------------+---------------+---------------+
121  * ! Num of values                 ! Type0         ! Type1         !
122  * +-------------------------------+---------------+---------------+
123  * ! Value0                                                        !
124  * !                                                               !
125  * +---------------------------------------------------------------+
126  * ! Value1                                                        !
127  * !                                                               !
128  * +---------------------------------------------------------------+
129  */
130 struct part_values_s
131 {
132         part_header_t *head;
133         uint16_t *num_values;
134         uint8_t  *values_types;
135         value_t  *values;
136 };
137 typedef struct part_values_s part_values_t;
138
139 struct receive_list_entry_s
140 {
141   char data[BUFF_SIZE];
142   int  data_len;
143   struct receive_list_entry_s *next;
144 };
145 typedef struct receive_list_entry_s receive_list_entry_t;
146
147 /*
148  * Private variables
149  */
150 static const char *config_keys[] =
151 {
152         "CacheFlush",
153         "Listen",
154         "Server",
155         "TimeToLive",
156         "Forward"
157 };
158 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
159
160 static int network_config_ttl = 0;
161 static int network_config_forward = 0;
162
163 static sockent_t *sending_sockets = NULL;
164
165 static receive_list_entry_t *receive_list_head = NULL;
166 static receive_list_entry_t *receive_list_tail = NULL;
167 static pthread_mutex_t       receive_list_lock = PTHREAD_MUTEX_INITIALIZER;
168 static pthread_cond_t        receive_list_cond = PTHREAD_COND_INITIALIZER;
169
170 static struct pollfd *listen_sockets = NULL;
171 static int            listen_sockets_num = 0;
172
173 /* The receive and dispatch threads will run as long as `listen_loop' is set to
174  * zero. */
175 static int       listen_loop = 0;
176 static int       receive_thread_running = 0;
177 static pthread_t receive_thread_id;
178 static int       dispatch_thread_running = 0;
179 static pthread_t dispatch_thread_id;
180
181 /* Buffer in which to-be-sent network packets are constructed. */
182 static char             send_buffer[BUFF_SIZE];
183 static char            *send_buffer_ptr;
184 static int              send_buffer_fill;
185 static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
186 static pthread_mutex_t  send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
187
188 /* In this cache we store all the values we received, so we can send out only
189  * those values which were *not* received via the network plugin, too. This is
190  * used for the `Forward false' option. */
191 static c_avl_tree_t    *cache_tree = NULL;
192 static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
193 static time_t           cache_flush_last = 0;
194 static int              cache_flush_interval = 1800;
195
196 /*
197  * Private functions
198  */
199 static int cache_flush (void)
200 {
201         char **keys = NULL;
202         int    keys_num = 0;
203
204         char **tmp;
205         int    i;
206
207         char   *key;
208         time_t *value;
209         c_avl_iterator_t *iter;
210
211         time_t curtime = time (NULL);
212
213         iter = c_avl_get_iterator (cache_tree);
214         while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
215         {
216                 if ((curtime - *value) <= cache_flush_interval)
217                         continue;
218                 tmp = (char **) realloc (keys,
219                                 (keys_num + 1) * sizeof (char *));
220                 if (tmp == NULL)
221                 {
222                         sfree (keys);
223                         c_avl_iterator_destroy (iter);
224                         ERROR ("network plugin: cache_flush: realloc"
225                                         " failed.");
226                         return (-1);
227                 }
228                 keys = tmp;
229                 keys[keys_num] = key;
230                 keys_num++;
231         } /* while (c_avl_iterator_next) */
232         c_avl_iterator_destroy (iter);
233
234         for (i = 0; i < keys_num; i++)
235         {
236                 if (c_avl_remove (cache_tree, keys[i], (void *) &key,
237                                         (void *) &value) != 0)
238                 {
239                         WARNING ("network plugin: cache_flush: c_avl_remove"
240                                         " (%s) failed.", keys[i]);
241                         continue;
242                 }
243
244                 sfree (key);
245                 sfree (value);
246         }
247
248         sfree (keys);
249
250         DEBUG ("network plugin: cache_flush: Removed %i %s",
251                         keys_num, (keys_num == 1) ? "entry" : "entries");
252         cache_flush_last = curtime;
253         return (0);
254 } /* int cache_flush */
255
256 static int cache_check (const value_list_t *vl)
257 {
258         char key[1024];
259         time_t *value = NULL;
260         int retval = -1;
261
262         if (cache_tree == NULL)
263                 return (-1);
264
265         if (format_name (key, sizeof (key), vl->host, vl->plugin,
266                                 vl->plugin_instance, vl->type, vl->type_instance))
267                 return (-1);
268
269         pthread_mutex_lock (&cache_lock);
270
271         if (c_avl_get (cache_tree, key, (void *) &value) == 0)
272         {
273                 if (*value < vl->time)
274                 {
275                         *value = vl->time;
276                         retval = 0;
277                 }
278                 else
279                 {
280                         DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i",
281                                         (int) *value, (int) vl->time);
282                         retval = 1;
283                 }
284         }
285         else
286         {
287                 char *key_copy = strdup (key);
288                 value = malloc (sizeof (time_t));
289                 if ((key_copy != NULL) && (value != NULL))
290                 {
291                         *value = vl->time;
292                         c_avl_insert (cache_tree, key_copy, value);
293                         retval = 0;
294                 }
295                 else
296                 {
297                         sfree (key_copy);
298                         sfree (value);
299                 }
300         }
301
302         if ((time (NULL) - cache_flush_last) > cache_flush_interval)
303                 cache_flush ();
304
305         pthread_mutex_unlock (&cache_lock);
306
307         DEBUG ("network plugin: cache_check: key = %s; time = %i; retval = %i",
308                         key, (int) vl->time, retval);
309
310         return (retval);
311 } /* int cache_check */
312
313 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
314                 const data_set_t *ds, const value_list_t *vl)
315 {
316         char *packet_ptr;
317         int packet_len;
318         int num_values;
319
320         part_header_t pkg_ph;
321         uint16_t      pkg_num_values;
322         uint8_t      *pkg_values_types;
323         value_t      *pkg_values;
324
325         int offset;
326         int i;
327
328         num_values = vl->values_len;
329         packet_len = sizeof (part_header_t) + sizeof (uint16_t)
330                 + (num_values * sizeof (uint8_t))
331                 + (num_values * sizeof (value_t));
332
333         if (*ret_buffer_len < packet_len)
334                 return (-1);
335
336         pkg_values_types = (uint8_t *) malloc (num_values * sizeof (uint8_t));
337         if (pkg_values_types == NULL)
338         {
339                 ERROR ("network plugin: write_part_values: malloc failed.");
340                 return (-1);
341         }
342
343         pkg_values = (value_t *) malloc (num_values * sizeof (value_t));
344         if (pkg_values == NULL)
345         {
346                 free (pkg_values_types);
347                 ERROR ("network plugin: write_part_values: malloc failed.");
348                 return (-1);
349         }
350
351         pkg_ph.type = htons (TYPE_VALUES);
352         pkg_ph.length = htons (packet_len);
353
354         pkg_num_values = htons ((uint16_t) vl->values_len);
355
356         for (i = 0; i < num_values; i++)
357         {
358                 if (ds->ds[i].type == DS_TYPE_COUNTER)
359                 {
360                         pkg_values_types[i] = DS_TYPE_COUNTER;
361                         pkg_values[i].counter = htonll (vl->values[i].counter);
362                 }
363                 else
364                 {
365                         pkg_values_types[i] = DS_TYPE_GAUGE;
366                         pkg_values[i].gauge = htond (vl->values[i].gauge);
367                 }
368         }
369
370         /*
371          * Use `memcpy' to write everything to the buffer, because the pointer
372          * may be unaligned and some architectures, such as SPARC, can't handle
373          * that.
374          */
375         packet_ptr = *ret_buffer;
376         offset = 0;
377         memcpy (packet_ptr + offset, &pkg_ph, sizeof (pkg_ph));
378         offset += sizeof (pkg_ph);
379         memcpy (packet_ptr + offset, &pkg_num_values, sizeof (pkg_num_values));
380         offset += sizeof (pkg_num_values);
381         memcpy (packet_ptr + offset, pkg_values_types, num_values * sizeof (uint8_t));
382         offset += num_values * sizeof (uint8_t);
383         memcpy (packet_ptr + offset, pkg_values, num_values * sizeof (value_t));
384         offset += num_values * sizeof (value_t);
385
386         assert (offset == packet_len);
387
388         *ret_buffer = packet_ptr + packet_len;
389         *ret_buffer_len -= packet_len;
390
391         free (pkg_values_types);
392         free (pkg_values);
393
394         return (0);
395 } /* int write_part_values */
396
397 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
398                 int type, uint64_t value)
399 {
400         char *packet_ptr;
401         int packet_len;
402
403         part_header_t pkg_head;
404         uint64_t pkg_value;
405         
406         int offset;
407
408         packet_len = sizeof (pkg_head) + sizeof (pkg_value);
409
410         if (*ret_buffer_len < packet_len)
411                 return (-1);
412
413         pkg_head.type = htons (type);
414         pkg_head.length = htons (packet_len);
415         pkg_value = htonll (value);
416
417         packet_ptr = *ret_buffer;
418         offset = 0;
419         memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
420         offset += sizeof (pkg_head);
421         memcpy (packet_ptr + offset, &pkg_value, sizeof (pkg_value));
422         offset += sizeof (pkg_value);
423
424         assert (offset == packet_len);
425
426         *ret_buffer = packet_ptr + packet_len;
427         *ret_buffer_len -= packet_len;
428
429         return (0);
430 } /* int write_part_number */
431
432 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
433                 int type, const char *str, int str_len)
434 {
435         char *buffer;
436         int buffer_len;
437
438         uint16_t pkg_type;
439         uint16_t pkg_length;
440
441         int offset;
442
443         buffer_len = 2 * sizeof (uint16_t) + str_len + 1;
444         if (*ret_buffer_len < buffer_len)
445                 return (-1);
446
447         pkg_type = htons (type);
448         pkg_length = htons (buffer_len);
449
450         buffer = *ret_buffer;
451         offset = 0;
452         memcpy (buffer + offset, (void *) &pkg_type, sizeof (pkg_type));
453         offset += sizeof (pkg_type);
454         memcpy (buffer + offset, (void *) &pkg_length, sizeof (pkg_length));
455         offset += sizeof (pkg_length);
456         memcpy (buffer + offset, str, str_len);
457         offset += str_len;
458         memset (buffer + offset, '\0', 1);
459         offset += 1;
460
461         assert (offset == buffer_len);
462
463         *ret_buffer = buffer + buffer_len;
464         *ret_buffer_len -= buffer_len;
465
466         return (0);
467 } /* int write_part_string */
468
469 static int parse_part_values (void **ret_buffer, int *ret_buffer_len,
470                 value_t **ret_values, int *ret_num_values)
471 {
472         char *buffer = *ret_buffer;
473         int   buffer_len = *ret_buffer_len;
474
475         uint16_t tmp16;
476         size_t exp_size;
477         int   i;
478
479         uint16_t pkg_length;
480         uint16_t pkg_type;
481         uint16_t pkg_numval;
482
483         uint8_t *pkg_types;
484         value_t *pkg_values;
485
486         if (buffer_len < (15))
487         {
488                 DEBUG ("network plugin: packet is too short: buffer_len = %i",
489                                 buffer_len);
490                 return (-1);
491         }
492
493         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
494         buffer += sizeof (tmp16);
495         pkg_type = ntohs (tmp16);
496
497         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
498         buffer += sizeof (tmp16);
499         pkg_length = ntohs (tmp16);
500
501         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
502         buffer += sizeof (tmp16);
503         pkg_numval = ntohs (tmp16);
504
505         assert (pkg_type == TYPE_VALUES);
506
507         exp_size = 3 * sizeof (uint16_t)
508                 + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
509         if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
510         {
511                 WARNING ("network plugin: parse_part_values: "
512                                 "Packet too short: "
513                                 "Chunk of size %u expected, "
514                                 "but buffer has only %i bytes left.",
515                                 (unsigned int) exp_size, buffer_len);
516                 return (-1);
517         }
518
519         if (pkg_length != exp_size)
520         {
521                 WARNING ("network plugin: parse_part_values: "
522                                 "Length and number of values "
523                                 "in the packet don't match.");
524                 return (-1);
525         }
526
527         pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
528         pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
529         if ((pkg_types == NULL) || (pkg_values == NULL))
530         {
531                 sfree (pkg_types);
532                 sfree (pkg_values);
533                 ERROR ("network plugin: parse_part_values: malloc failed.");
534                 return (-1);
535         }
536
537         memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
538         buffer += pkg_numval * sizeof (uint8_t);
539         memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
540         buffer += pkg_numval * sizeof (value_t);
541
542         for (i = 0; i < pkg_numval; i++)
543         {
544                 if (pkg_types[i] == DS_TYPE_COUNTER)
545                         pkg_values[i].counter = ntohll (pkg_values[i].counter);
546                 else if (pkg_types[i] == DS_TYPE_GAUGE)
547                         pkg_values[i].gauge = ntohd (pkg_values[i].gauge);
548         }
549
550         *ret_buffer     = buffer;
551         *ret_buffer_len = buffer_len - pkg_length;
552         *ret_num_values = pkg_numval;
553         *ret_values     = pkg_values;
554
555         sfree (pkg_types);
556
557         return (0);
558 } /* int parse_part_values */
559
560 static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
561                 uint64_t *value)
562 {
563         char *buffer = *ret_buffer;
564         int buffer_len = *ret_buffer_len;
565
566         uint16_t tmp16;
567         uint64_t tmp64;
568         size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
569
570         uint16_t pkg_length;
571         uint16_t pkg_type;
572
573         if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
574         {
575                 WARNING ("network plugin: parse_part_number: "
576                                 "Packet too short: "
577                                 "Chunk of size %u expected, "
578                                 "but buffer has only %i bytes left.",
579                                 (unsigned int) exp_size, buffer_len);
580                 return (-1);
581         }
582
583         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
584         buffer += sizeof (tmp16);
585         pkg_type = ntohs (tmp16);
586
587         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
588         buffer += sizeof (tmp16);
589         pkg_length = ntohs (tmp16);
590
591         memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
592         buffer += sizeof (tmp64);
593         *value = ntohll (tmp64);
594
595         *ret_buffer = buffer;
596         *ret_buffer_len = buffer_len - pkg_length;
597
598         return (0);
599 } /* int parse_part_number */
600
601 static int parse_part_string (void **ret_buffer, int *ret_buffer_len,
602                 char *output, int output_len)
603 {
604         char *buffer = *ret_buffer;
605         int   buffer_len = *ret_buffer_len;
606
607         uint16_t tmp16;
608         size_t header_size = 2 * sizeof (uint16_t);
609
610         uint16_t pkg_length;
611         uint16_t pkg_type;
612
613         if ((buffer_len < 0) || ((size_t) buffer_len < header_size))
614         {
615                 WARNING ("network plugin: parse_part_string: "
616                                 "Packet too short: "
617                                 "Chunk of at least size %u expected, "
618                                 "but buffer has only %i bytes left.",
619                                 (unsigned int) header_size, buffer_len);
620                 return (-1);
621         }
622
623         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
624         buffer += sizeof (tmp16);
625         pkg_type = ntohs (tmp16);
626
627         memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
628         buffer += sizeof (tmp16);
629         pkg_length = ntohs (tmp16);
630
631         /* Check that packet fits in the input buffer */
632         if (pkg_length > buffer_len)
633         {
634                 WARNING ("network plugin: parse_part_string: "
635                                 "Packet too big: "
636                                 "Chunk of size %hu received, "
637                                 "but buffer has only %i bytes left.",
638                                 pkg_length, buffer_len);
639                 return (-1);
640         }
641
642         /* Check that pkg_length is in the valid range */
643         if (pkg_length <= header_size)
644         {
645                 WARNING ("network plugin: parse_part_string: "
646                                 "Packet too short: "
647                                 "Header claims this packet is only %hu "
648                                 "bytes long.", pkg_length);
649                 return (-1);
650         }
651
652         /* Check that the package data fits into the output buffer.
653          * The previous if-statement ensures that:
654          * `pkg_length > header_size' */
655         if ((output_len < 0)
656                         || ((size_t) output_len < ((size_t) pkg_length - header_size)))
657         {
658                 WARNING ("network plugin: parse_part_string: "
659                                 "Output buffer too small.");
660                 return (-1);
661         }
662
663         /* All sanity checks successfull, let's copy the data over */
664         output_len = pkg_length - header_size;
665         memcpy ((void *) output, (void *) buffer, output_len);
666         buffer += output_len;
667
668         /* For some very weird reason '\0' doesn't do the trick on SPARC in
669          * this statement. */
670         if (output[output_len - 1] != 0)
671         {
672                 WARNING ("network plugin: parse_part_string: "
673                                 "Received string does not end "
674                                 "with a NULL-byte.");
675                 return (-1);
676         }
677
678         *ret_buffer = buffer;
679         *ret_buffer_len = buffer_len - pkg_length;
680
681         return (0);
682 } /* int parse_part_string */
683
684 static int parse_packet (void *buffer, int buffer_len)
685 {
686         int status;
687
688         value_list_t vl = VALUE_LIST_INIT;
689         notification_t n;
690
691         DEBUG ("network plugin: parse_packet: buffer = %p; buffer_len = %i;",
692                         buffer, buffer_len);
693
694         memset (&vl, '\0', sizeof (vl));
695         memset (&n, '\0', sizeof (n));
696         status = 0;
697
698         while ((status == 0) && (0 < buffer_len)
699                         && ((unsigned int) buffer_len > sizeof (part_header_t)))
700         {
701                 uint16_t pkg_length;
702                 uint16_t pkg_type;
703
704                 memcpy ((void *) &pkg_type,
705                                 (void *) buffer,
706                                 sizeof (pkg_type));
707                 memcpy ((void *) &pkg_length,
708                                 (void *) (buffer + sizeof (pkg_type)),
709                                 sizeof (pkg_length));
710
711                 pkg_length = ntohs (pkg_length);
712                 pkg_type = ntohs (pkg_type);
713
714                 if (pkg_length > buffer_len)
715                         break;
716                 /* Ensure that this loop terminates eventually */
717                 if (pkg_length < (2 * sizeof (uint16_t)))
718                         break;
719
720                 if (pkg_type == TYPE_VALUES)
721                 {
722                         status = parse_part_values (&buffer, &buffer_len,
723                                         &vl.values, &vl.values_len);
724
725                         if (status != 0)
726                                 break;
727
728                         if ((vl.time > 0)
729                                         && (strlen (vl.host) > 0)
730                                         && (strlen (vl.plugin) > 0)
731                                         && (strlen (vl.type) > 0)
732                                         && (cache_check (&vl) == 0))
733                         {
734                                 plugin_dispatch_values (&vl);
735                         }
736                         else
737                         {
738                                 DEBUG ("network plugin: parse_packet:"
739                                                 " NOT dispatching values");
740                         }
741
742                         sfree (vl.values);
743                 }
744                 else if (pkg_type == TYPE_TIME)
745                 {
746                         uint64_t tmp = 0;
747                         status = parse_part_number (&buffer, &buffer_len,
748                                         &tmp);
749                         if (status == 0)
750                         {
751                                 vl.time = (time_t) tmp;
752                                 n.time = (time_t) tmp;
753                         }
754                 }
755                 else if (pkg_type == TYPE_INTERVAL)
756                 {
757                         uint64_t tmp = 0;
758                         status = parse_part_number (&buffer, &buffer_len,
759                                         &tmp);
760                         if (status == 0)
761                                 vl.interval = (int) tmp;
762                 }
763                 else if (pkg_type == TYPE_HOST)
764                 {
765                         status = parse_part_string (&buffer, &buffer_len,
766                                         vl.host, sizeof (vl.host));
767                         if (status == 0)
768                                 sstrncpy (n.host, vl.host, sizeof (n.host));
769                 }
770                 else if (pkg_type == TYPE_PLUGIN)
771                 {
772                         status = parse_part_string (&buffer, &buffer_len,
773                                         vl.plugin, sizeof (vl.plugin));
774                         if (status == 0)
775                                 sstrncpy (n.plugin, vl.plugin,
776                                                 sizeof (n.plugin));
777                 }
778                 else if (pkg_type == TYPE_PLUGIN_INSTANCE)
779                 {
780                         status = parse_part_string (&buffer, &buffer_len,
781                                         vl.plugin_instance,
782                                         sizeof (vl.plugin_instance));
783                         if (status == 0)
784                                 sstrncpy (n.plugin_instance,
785                                                 vl.plugin_instance,
786                                                 sizeof (n.plugin_instance));
787                 }
788                 else if (pkg_type == TYPE_TYPE)
789                 {
790                         status = parse_part_string (&buffer, &buffer_len,
791                                         vl.type, sizeof (vl.type));
792                         if (status == 0)
793                                 sstrncpy (n.type, vl.type, sizeof (n.type));
794                 }
795                 else if (pkg_type == TYPE_TYPE_INSTANCE)
796                 {
797                         status = parse_part_string (&buffer, &buffer_len,
798                                         vl.type_instance,
799                                         sizeof (vl.type_instance));
800                         if (status == 0)
801                                 sstrncpy (n.type_instance, vl.type_instance,
802                                                 sizeof (n.type_instance));
803                 }
804                 else if (pkg_type == TYPE_MESSAGE)
805                 {
806                         status = parse_part_string (&buffer, &buffer_len,
807                                         n.message, sizeof (n.message));
808
809                         if (status != 0)
810                         {
811                                 /* do nothing */
812                         }
813                         else if ((n.severity != NOTIF_FAILURE)
814                                         && (n.severity != NOTIF_WARNING)
815                                         && (n.severity != NOTIF_OKAY))
816                         {
817                                 INFO ("network plugin: "
818                                                 "Ignoring notification with "
819                                                 "unknown severity %i.",
820                                                 n.severity);
821                         }
822                         else if (n.time <= 0)
823                         {
824                                 INFO ("network plugin: "
825                                                 "Ignoring notification with "
826                                                 "time == 0.");
827                         }
828                         else if (strlen (n.message) <= 0)
829                         {
830                                 INFO ("network plugin: "
831                                                 "Ignoring notification with "
832                                                 "an empty message.");
833                         }
834                         else
835                         {
836                                 plugin_dispatch_notification (&n);
837                         }
838                 }
839                 else if (pkg_type == TYPE_SEVERITY)
840                 {
841                         uint64_t tmp = 0;
842                         status = parse_part_number (&buffer, &buffer_len,
843                                         &tmp);
844                         if (status == 0)
845                                 n.severity = (int) tmp;
846                 }
847                 else
848                 {
849                         DEBUG ("network plugin: parse_packet: Unknown part"
850                                         " type: 0x%04hx", pkg_type);
851                         buffer = ((char *) buffer) + pkg_length;
852                 }
853         } /* while (buffer_len > sizeof (part_header_t)) */
854
855         return (status);
856 } /* int parse_packet */
857
858 static void free_sockent (sockent_t *se)
859 {
860         sockent_t *next;
861         while (se != NULL)
862         {
863                 next = se->next;
864                 free (se->addr);
865                 free (se);
866                 se = next;
867         }
868 } /* void free_sockent */
869
870 /*
871  * int network_set_ttl
872  *
873  * Set the `IP_MULTICAST_TTL', `IP_TTL', `IPV6_MULTICAST_HOPS' or
874  * `IPV6_UNICAST_HOPS', depending on which option is applicable.
875  *
876  * The `struct addrinfo' is used to destinguish between unicast and multicast
877  * sockets.
878  */
879 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
880 {
881         if ((network_config_ttl < 1) || (network_config_ttl > 255))
882                 return (-1);
883
884         DEBUG ("ttl = %i", network_config_ttl);
885
886         if (ai->ai_family == AF_INET)
887         {
888                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
889                 int optname;
890
891                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
892                         optname = IP_MULTICAST_TTL;
893                 else
894                         optname = IP_TTL;
895
896                 if (setsockopt (se->fd, IPPROTO_IP, optname,
897                                         &network_config_ttl,
898                                         sizeof (network_config_ttl)) == -1)
899                 {
900                         char errbuf[1024];
901                         ERROR ("setsockopt: %s",
902                                         sstrerror (errno, errbuf, sizeof (errbuf)));
903                         return (-1);
904                 }
905         }
906         else if (ai->ai_family == AF_INET6)
907         {
908                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
909                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
910                 int optname;
911
912                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
913                         optname = IPV6_MULTICAST_HOPS;
914                 else
915                         optname = IPV6_UNICAST_HOPS;
916
917                 if (setsockopt (se->fd, IPPROTO_IPV6, optname,
918                                         &network_config_ttl,
919                                         sizeof (network_config_ttl)) == -1)
920                 {
921                         char errbuf[1024];
922                         ERROR ("setsockopt: %s",
923                                         sstrerror (errno, errbuf,
924                                                 sizeof (errbuf)));
925                         return (-1);
926                 }
927         }
928
929         return (0);
930 } /* int network_set_ttl */
931
932 static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
933 {
934         int loop = 0;
935         int yes  = 1;
936
937         /* allow multiple sockets to use the same PORT number */
938         if (setsockopt(se->fd, SOL_SOCKET, SO_REUSEADDR,
939                                 &yes, sizeof(yes)) == -1) {
940                 char errbuf[1024];
941                 ERROR ("setsockopt: %s", 
942                                 sstrerror (errno, errbuf, sizeof (errbuf)));
943                 return (-1);
944         }
945
946         DEBUG ("fd = %i; calling `bind'", se->fd);
947
948         if (bind (se->fd, ai->ai_addr, ai->ai_addrlen) == -1)
949         {
950                 char errbuf[1024];
951                 ERROR ("bind: %s",
952                                 sstrerror (errno, errbuf, sizeof (errbuf)));
953                 return (-1);
954         }
955
956         if (ai->ai_family == AF_INET)
957         {
958                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
959                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
960                 {
961                         struct ip_mreq mreq;
962
963                         DEBUG ("fd = %i; IPv4 multicast address found", se->fd);
964
965                         mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
966                         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
967
968                         if (setsockopt (se->fd, IPPROTO_IP, IP_MULTICAST_LOOP,
969                                                 &loop, sizeof (loop)) == -1)
970                         {
971                                 char errbuf[1024];
972                                 ERROR ("setsockopt: %s",
973                                                 sstrerror (errno, errbuf,
974                                                         sizeof (errbuf)));
975                                 return (-1);
976                         }
977
978                         if (setsockopt (se->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
979                                                 &mreq, sizeof (mreq)) == -1)
980                         {
981                                 char errbuf[1024];
982                                 ERROR ("setsockopt: %s",
983                                                 sstrerror (errno, errbuf,
984                                                         sizeof (errbuf)));
985                                 return (-1);
986                         }
987                 }
988         }
989         else if (ai->ai_family == AF_INET6)
990         {
991                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
992                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
993                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
994                 {
995                         struct ipv6_mreq mreq;
996
997                         DEBUG ("fd = %i; IPv6 multicast address found", se->fd);
998
999                         memcpy (&mreq.ipv6mr_multiaddr,
1000                                         &addr->sin6_addr,
1001                                         sizeof (addr->sin6_addr));
1002
1003                         /* http://developer.apple.com/documentation/Darwin/Reference/ManPages/man4/ip6.4.html
1004                          * ipv6mr_interface may be set to zeroes to
1005                          * choose the default multicast interface or to
1006                          * the index of a particular multicast-capable
1007                          * interface if the host is multihomed.
1008                          * Membership is associ-associated with a
1009                          * single interface; programs running on
1010                          * multihomed hosts may need to join the same
1011                          * group on more than one interface.*/
1012                         mreq.ipv6mr_interface = 0;
1013
1014                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
1015                                                 &loop, sizeof (loop)) == -1)
1016                         {
1017                                 char errbuf[1024];
1018                                 ERROR ("setsockopt: %s",
1019                                                 sstrerror (errno, errbuf,
1020                                                         sizeof (errbuf)));
1021                                 return (-1);
1022                         }
1023
1024                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
1025                                                 &mreq, sizeof (mreq)) == -1)
1026                         {
1027                                 char errbuf[1024];
1028                                 ERROR ("setsockopt: %s",
1029                                                 sstrerror (errno, errbuf,
1030                                                         sizeof (errbuf)));
1031                                 return (-1);
1032                         }
1033                 }
1034         }
1035
1036         return (0);
1037 } /* int network_bind_socket */
1038
1039 static sockent_t *network_create_socket (const char *node,
1040                 const char *service,
1041                 int listen)
1042 {
1043         struct addrinfo  ai_hints;
1044         struct addrinfo *ai_list, *ai_ptr;
1045         int              ai_return;
1046
1047         sockent_t *se_head = NULL;
1048         sockent_t *se_tail = NULL;
1049
1050         DEBUG ("node = %s, service = %s", node, service);
1051
1052         memset (&ai_hints, '\0', sizeof (ai_hints));
1053         ai_hints.ai_flags    = 0;
1054 #ifdef AI_PASSIVE
1055         ai_hints.ai_flags |= AI_PASSIVE;
1056 #endif
1057 #ifdef AI_ADDRCONFIG
1058         ai_hints.ai_flags |= AI_ADDRCONFIG;
1059 #endif
1060         ai_hints.ai_family   = AF_UNSPEC;
1061         ai_hints.ai_socktype = SOCK_DGRAM;
1062         ai_hints.ai_protocol = IPPROTO_UDP;
1063
1064         ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
1065         if (ai_return != 0)
1066         {
1067                 char errbuf[1024];
1068                 ERROR ("getaddrinfo (%s, %s): %s",
1069                                 (node == NULL) ? "(null)" : node,
1070                                 (service == NULL) ? "(null)" : service,
1071                                 (ai_return == EAI_SYSTEM)
1072                                 ? sstrerror (errno, errbuf, sizeof (errbuf))
1073                                 : gai_strerror (ai_return));
1074                 return (NULL);
1075         }
1076
1077         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1078         {
1079                 sockent_t *se;
1080
1081                 if ((se = (sockent_t *) malloc (sizeof (sockent_t))) == NULL)
1082                 {
1083                         char errbuf[1024];
1084                         ERROR ("malloc: %s",
1085                                         sstrerror (errno, errbuf,
1086                                                 sizeof (errbuf)));
1087                         continue;
1088                 }
1089
1090                 if ((se->addr = (struct sockaddr_storage *) malloc (sizeof (struct sockaddr_storage))) == NULL)
1091                 {
1092                         char errbuf[1024];
1093                         ERROR ("malloc: %s",
1094                                         sstrerror (errno, errbuf,
1095                                                 sizeof (errbuf)));
1096                         free (se);
1097                         continue;
1098                 }
1099
1100                 assert (sizeof (struct sockaddr_storage) >= ai_ptr->ai_addrlen);
1101                 memset (se->addr, '\0', sizeof (struct sockaddr_storage));
1102                 memcpy (se->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1103                 se->addrlen = ai_ptr->ai_addrlen;
1104
1105                 se->fd   = socket (ai_ptr->ai_family,
1106                                 ai_ptr->ai_socktype,
1107                                 ai_ptr->ai_protocol);
1108                 se->next = NULL;
1109
1110                 if (se->fd == -1)
1111                 {
1112                         char errbuf[1024];
1113                         ERROR ("socket: %s",
1114                                         sstrerror (errno, errbuf,
1115                                                 sizeof (errbuf)));
1116                         free (se->addr);
1117                         free (se);
1118                         continue;
1119                 }
1120
1121                 if (listen != 0)
1122                 {
1123                         if (network_bind_socket (se, ai_ptr) != 0)
1124                         {
1125                                 close (se->fd);
1126                                 free (se->addr);
1127                                 free (se);
1128                                 continue;
1129                         }
1130                 }
1131                 else /* listen == 0 */
1132                 {
1133                         network_set_ttl (se, ai_ptr);
1134                 }
1135
1136                 if (se_tail == NULL)
1137                 {
1138                         se_head = se;
1139                         se_tail = se;
1140                 }
1141                 else
1142                 {
1143                         se_tail->next = se;
1144                         se_tail = se;
1145                 }
1146
1147                 /* We don't open more than one write-socket per node/service pair.. */
1148                 if (listen == 0)
1149                         break;
1150         }
1151
1152         freeaddrinfo (ai_list);
1153
1154         return (se_head);
1155 } /* sockent_t *network_create_socket */
1156
1157 static sockent_t *network_create_default_socket (int listen)
1158 {
1159         sockent_t *se_ptr  = NULL;
1160         sockent_t *se_head = NULL;
1161         sockent_t *se_tail = NULL;
1162
1163         se_ptr = network_create_socket (NET_DEFAULT_V6_ADDR,
1164                         NET_DEFAULT_PORT, listen);
1165
1166         /* Don't send to the same machine in IPv6 and IPv4 if both are available. */
1167         if ((listen == 0) && (se_ptr != NULL))
1168                 return (se_ptr);
1169
1170         if (se_ptr != NULL)
1171         {
1172                 se_head = se_ptr;
1173                 se_tail = se_ptr;
1174                 while (se_tail->next != NULL)
1175                         se_tail = se_tail->next;
1176         }
1177
1178         se_ptr = network_create_socket (NET_DEFAULT_V4_ADDR, NET_DEFAULT_PORT, listen);
1179
1180         if (se_tail == NULL)
1181                 return (se_ptr);
1182
1183         se_tail->next = se_ptr;
1184         return (se_head);
1185 } /* sockent_t *network_create_default_socket */
1186
1187 static int network_add_listen_socket (const char *node, const char *service)
1188 {
1189         sockent_t *se;
1190         sockent_t *se_ptr;
1191         int se_num = 0;
1192
1193         if (service == NULL)
1194                 service = NET_DEFAULT_PORT;
1195
1196         if (node == NULL)
1197                 se = network_create_default_socket (1 /* listen == true */);
1198         else
1199                 se = network_create_socket (node, service, 1 /* listen == true */);
1200
1201         if (se == NULL)
1202                 return (-1);
1203
1204         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
1205                 se_num++;
1206
1207         listen_sockets = (struct pollfd *) realloc (listen_sockets,
1208                         (listen_sockets_num + se_num)
1209                         * sizeof (struct pollfd));
1210
1211         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
1212         {
1213                 listen_sockets[listen_sockets_num].fd = se_ptr->fd;
1214                 listen_sockets[listen_sockets_num].events = POLLIN | POLLPRI;
1215                 listen_sockets[listen_sockets_num].revents = 0;
1216                 listen_sockets_num++;
1217         } /* for (se) */
1218
1219         free_sockent (se);
1220         return (0);
1221 } /* int network_add_listen_socket */
1222
1223 static int network_add_sending_socket (const char *node, const char *service)
1224 {
1225         sockent_t *se;
1226         sockent_t *se_ptr;
1227
1228         if (service == NULL)
1229                 service = NET_DEFAULT_PORT;
1230
1231         if (node == NULL)
1232                 se = network_create_default_socket (0 /* listen == false */);
1233         else
1234                 se = network_create_socket (node, service, 0 /* listen == false */);
1235
1236         if (se == NULL)
1237                 return (-1);
1238
1239         if (sending_sockets == NULL)
1240         {
1241                 sending_sockets = se;
1242                 return (0);
1243         }
1244
1245         for (se_ptr = sending_sockets; se_ptr->next != NULL; se_ptr = se_ptr->next)
1246                 /* seek end */;
1247
1248         se_ptr->next = se;
1249         return (0);
1250 } /* int network_get_listen_socket */
1251
1252 static void *dispatch_thread (void __attribute__((unused)) *arg)
1253 {
1254   while (42)
1255   {
1256     receive_list_entry_t *ent;
1257
1258     /* Lock and wait for more data to come in */
1259     pthread_mutex_lock (&receive_list_lock);
1260     while ((listen_loop == 0)
1261         && (receive_list_head == NULL))
1262       pthread_cond_wait (&receive_list_cond, &receive_list_lock);
1263
1264     /* Remove the head entry and unlock */
1265     ent = receive_list_head;
1266     if (ent != NULL)
1267       receive_list_head = ent->next;
1268     pthread_mutex_unlock (&receive_list_lock);
1269
1270     /* Check whether we are supposed to exit. We do NOT check `listen_loop'
1271      * because we dispatch all missing packets before shutting down. */
1272     if (ent == NULL)
1273       break;
1274
1275     parse_packet (ent->data, ent->data_len);
1276
1277     sfree (ent);
1278   } /* while (42) */
1279
1280   return (NULL);
1281 } /* void *receive_thread */
1282
1283 static int network_receive (void)
1284 {
1285         char buffer[BUFF_SIZE];
1286         int  buffer_len;
1287
1288         int i;
1289         int status;
1290
1291         receive_list_entry_t *private_list_head;
1292         receive_list_entry_t *private_list_tail;
1293
1294         if (listen_sockets_num == 0)
1295                 network_add_listen_socket (NULL, NULL);
1296
1297         if (listen_sockets_num == 0)
1298         {
1299                 ERROR ("network: Failed to open a listening socket.");
1300                 return (-1);
1301         }
1302
1303         private_list_head = NULL;
1304         private_list_tail = NULL;
1305
1306         while (listen_loop == 0)
1307         {
1308                 status = poll (listen_sockets, listen_sockets_num, -1);
1309
1310                 if (status <= 0)
1311                 {
1312                         char errbuf[1024];
1313                         if (errno == EINTR)
1314                                 continue;
1315                         ERROR ("poll failed: %s",
1316                                         sstrerror (errno, errbuf, sizeof (errbuf)));
1317                         return (-1);
1318                 }
1319
1320                 for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
1321                 {
1322                         receive_list_entry_t *ent;
1323
1324                         if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
1325                                 continue;
1326                         status--;
1327
1328                         buffer_len = recv (listen_sockets[i].fd,
1329                                         buffer, sizeof (buffer),
1330                                         0 /* no flags */);
1331                         if (buffer_len < 0)
1332                         {
1333                                 char errbuf[1024];
1334                                 ERROR ("recv failed: %s",
1335                                                 sstrerror (errno, errbuf,
1336                                                         sizeof (errbuf)));
1337                                 return (-1);
1338                         }
1339
1340                         /* TODO: Possible performance enhancement: Do not free
1341                          * these entries in the dispatch thread but put them in
1342                          * another list, so we don't have to allocate more and
1343                          * more of these structures. */
1344                         ent = malloc (sizeof (receive_list_entry_t));
1345                         if (ent == NULL)
1346                         {
1347                                 ERROR ("network plugin: malloc failed.");
1348                                 return (-1);
1349                         }
1350                         memset (ent, 0, sizeof (receive_list_entry_t));
1351                         ent->next = NULL;
1352
1353                         /* Hopefully this be optimized out by the compiler. It
1354                          * might help prevent stupid bugs in the future though.
1355                          */
1356                         assert (sizeof (ent->data) == sizeof (buffer));
1357
1358                         memcpy (ent->data, buffer, buffer_len);
1359                         ent->data_len = buffer_len;
1360
1361                         if (private_list_head == NULL)
1362                                 private_list_head = ent;
1363                         else
1364                                 private_list_tail->next = ent;
1365                         private_list_tail = ent;
1366
1367                         /* Do not block here. Blocking here has led to
1368                          * insufficient performance in the past. */
1369                         if (pthread_mutex_trylock (&receive_list_lock) == 0)
1370                         {
1371                                 if (receive_list_head == NULL)
1372                                         receive_list_head = private_list_head;
1373                                 else
1374                                         receive_list_tail->next = private_list_head;
1375                                 receive_list_tail = private_list_tail;
1376
1377                                 private_list_head = NULL;
1378                                 private_list_tail = NULL;
1379
1380                                 pthread_cond_signal (&receive_list_cond);
1381                                 pthread_mutex_unlock (&receive_list_lock);
1382                         }
1383                 } /* for (listen_sockets) */
1384         } /* while (listen_loop == 0) */
1385
1386         /* Make sure everything is dispatched before exiting. */
1387         if (private_list_head != NULL)
1388         {
1389                 pthread_mutex_lock (&receive_list_lock);
1390
1391                 if (receive_list_head == NULL)
1392                         receive_list_head = private_list_head;
1393                 else
1394                         receive_list_tail->next = private_list_head;
1395                 receive_list_tail = private_list_tail;
1396
1397                 private_list_head = NULL;
1398                 private_list_tail = NULL;
1399
1400                 pthread_cond_signal (&receive_list_cond);
1401                 pthread_mutex_unlock (&receive_list_lock);
1402         }
1403
1404         return (0);
1405 }
1406
1407 static void *receive_thread (void __attribute__((unused)) *arg)
1408 {
1409         return (network_receive () ? (void *) 1 : (void *) 0);
1410 } /* void *receive_thread */
1411
1412 static void network_send_buffer (const char *buffer, int buffer_len)
1413 {
1414         sockent_t *se;
1415         int status;
1416
1417         DEBUG ("network plugin: network_send_buffer: buffer_len = %i", buffer_len);
1418
1419         for (se = sending_sockets; se != NULL; se = se->next)
1420         {
1421                 while (42)
1422                 {
1423                         status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */,
1424                                         (struct sockaddr *) se->addr, se->addrlen);
1425                         if (status < 0)
1426                         {
1427                                 char errbuf[1024];
1428                                 if (errno == EINTR)
1429                                         continue;
1430                                 ERROR ("network plugin: sendto failed: %s",
1431                                                 sstrerror (errno, errbuf,
1432                                                         sizeof (errbuf)));
1433                                 break;
1434                         }
1435
1436                         break;
1437                 } /* while (42) */
1438         } /* for (sending_sockets) */
1439 } /* void network_send_buffer */
1440
1441 static int add_to_buffer (char *buffer, int buffer_size,
1442                 value_list_t *vl_def,
1443                 const data_set_t *ds, const value_list_t *vl)
1444 {
1445         char *buffer_orig = buffer;
1446
1447         if (strcmp (vl_def->host, vl->host) != 0)
1448         {
1449                 if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
1450                                         vl->host, strlen (vl->host)) != 0)
1451                         return (-1);
1452                 sstrncpy (vl_def->host, vl->host, sizeof (vl_def->host));
1453         }
1454
1455         if (vl_def->time != vl->time)
1456         {
1457                 if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
1458                                         (uint64_t) vl->time))
1459                         return (-1);
1460                 vl_def->time = vl->time;
1461         }
1462
1463         if (vl_def->interval != vl->interval)
1464         {
1465                 if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
1466                                         (uint64_t) vl->interval))
1467                         return (-1);
1468                 vl_def->interval = vl->interval;
1469         }
1470
1471         if (strcmp (vl_def->plugin, vl->plugin) != 0)
1472         {
1473                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
1474                                         vl->plugin, strlen (vl->plugin)) != 0)
1475                         return (-1);
1476                 sstrncpy (vl_def->plugin, vl->plugin, sizeof (vl_def->plugin));
1477         }
1478
1479         if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
1480         {
1481                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
1482                                         vl->plugin_instance,
1483                                         strlen (vl->plugin_instance)) != 0)
1484                         return (-1);
1485                 sstrncpy (vl_def->plugin_instance, vl->plugin_instance, sizeof (vl_def->plugin_instance));
1486         }
1487
1488         if (strcmp (vl_def->type, vl->type) != 0)
1489         {
1490                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
1491                                         vl->type, strlen (vl->type)) != 0)
1492                         return (-1);
1493                 sstrncpy (vl_def->type, ds->type, sizeof (vl_def->type));
1494         }
1495
1496         if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
1497         {
1498                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
1499                                         vl->type_instance,
1500                                         strlen (vl->type_instance)) != 0)
1501                         return (-1);
1502                 sstrncpy (vl_def->type_instance, vl->type_instance, sizeof (vl_def->type_instance));
1503         }
1504         
1505         if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
1506                 return (-1);
1507
1508         return (buffer - buffer_orig);
1509 } /* int add_to_buffer */
1510
1511 static void flush_buffer (void)
1512 {
1513         DEBUG ("network plugin: flush_buffer: send_buffer_fill = %i",
1514                         send_buffer_fill);
1515
1516         network_send_buffer (send_buffer, send_buffer_fill);
1517         send_buffer_ptr  = send_buffer;
1518         send_buffer_fill = 0;
1519         memset (&send_buffer_vl, 0, sizeof (send_buffer_vl));
1520 }
1521
1522 static int network_write (const data_set_t *ds, const value_list_t *vl,
1523                 user_data_t __attribute__((unused)) *user_data)
1524 {
1525         int status;
1526
1527         /* If the value is already in the cache, we have received it via the
1528          * network. We write it again if forwarding is activated. It's then in
1529          * the cache and should we receive it again we will ignore it. */
1530         status = cache_check (vl);
1531         if ((network_config_forward == 0)
1532                         && (status != 0))
1533                 return (0);
1534
1535         pthread_mutex_lock (&send_buffer_lock);
1536
1537         status = add_to_buffer (send_buffer_ptr,
1538                         sizeof (send_buffer) - send_buffer_fill,
1539                         &send_buffer_vl,
1540                         ds, vl);
1541         if (status >= 0)
1542         {
1543                 /* status == bytes added to the buffer */
1544                 send_buffer_fill += status;
1545                 send_buffer_ptr  += status;
1546         }
1547         else
1548         {
1549                 flush_buffer ();
1550
1551                 status = add_to_buffer (send_buffer_ptr,
1552                                 sizeof (send_buffer) - send_buffer_fill,
1553                                 &send_buffer_vl,
1554                                 ds, vl);
1555
1556                 if (status >= 0)
1557                 {
1558                         send_buffer_fill += status;
1559                         send_buffer_ptr  += status;
1560                 }
1561         }
1562
1563         if (status < 0)
1564         {
1565                 ERROR ("network plugin: Unable to append to the "
1566                                 "buffer for some weird reason");
1567         }
1568         else if ((sizeof (send_buffer) - send_buffer_fill) < 15)
1569         {
1570                 flush_buffer ();
1571         }
1572
1573         pthread_mutex_unlock (&send_buffer_lock);
1574
1575         return ((status < 0) ? -1 : 0);
1576 } /* int network_write */
1577
1578 static int network_config (const char *key, const char *val)
1579 {
1580         char *node;
1581         char *service;
1582
1583         char *fields[3];
1584         int   fields_num;
1585
1586         if ((strcasecmp ("Listen", key) == 0)
1587                         || (strcasecmp ("Server", key) == 0))
1588         {
1589                 char *val_cpy = strdup (val);
1590                 if (val_cpy == NULL)
1591                         return (1);
1592
1593                 service = NET_DEFAULT_PORT;
1594                 fields_num = strsplit (val_cpy, fields, 3);
1595                 if ((fields_num != 1)
1596                                 && (fields_num != 2))
1597                 {
1598                         sfree (val_cpy);
1599                         return (1);
1600                 }
1601                 else if (fields_num == 2)
1602                 {
1603                         if ((service = strchr (fields[1], '.')) != NULL)
1604                                 *service = '\0';
1605                         service = fields[1];
1606                 }
1607                 node = fields[0];
1608
1609                 if (strcasecmp ("Listen", key) == 0)
1610                         network_add_listen_socket (node, service);
1611                 else
1612                         network_add_sending_socket (node, service);
1613
1614                 sfree (val_cpy);
1615         }
1616         else if (strcasecmp ("TimeToLive", key) == 0)
1617         {
1618                 int tmp = atoi (val);
1619                 if ((tmp > 0) && (tmp < 256))
1620                         network_config_ttl = tmp;
1621                 else
1622                         return (1);
1623         }
1624         else if (strcasecmp ("Forward", key) == 0)
1625         {
1626                 if ((strcasecmp ("true", val) == 0)
1627                                 || (strcasecmp ("yes", val) == 0)
1628                                 || (strcasecmp ("on", val) == 0))
1629                         network_config_forward = 1;
1630                 else
1631                         network_config_forward = 0;
1632         }
1633         else if (strcasecmp ("CacheFlush", key) == 0)
1634         {
1635                 int tmp = atoi (val);
1636                 if (tmp > 0)
1637                         cache_flush_interval = tmp;
1638                 else return (1);
1639         }
1640         else
1641         {
1642                 return (-1);
1643         }
1644         return (0);
1645 } /* int network_config */
1646
1647 static int network_notification (const notification_t *n,
1648                 user_data_t __attribute__((unused)) *user_data)
1649 {
1650   char  buffer[BUFF_SIZE];
1651   char *buffer_ptr = buffer;
1652   int   buffer_free = sizeof (buffer);
1653   int   status;
1654
1655   memset (buffer, '\0', sizeof (buffer));
1656
1657
1658   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME,
1659       (uint64_t) n->time);
1660   if (status != 0)
1661     return (-1);
1662
1663   status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY,
1664       (uint64_t) n->severity);
1665   if (status != 0)
1666     return (-1);
1667
1668   if (strlen (n->host) > 0)
1669   {
1670     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST,
1671         n->host, strlen (n->host));
1672     if (status != 0)
1673       return (-1);
1674   }
1675
1676   if (strlen (n->plugin) > 0)
1677   {
1678     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN,
1679         n->plugin, strlen (n->plugin));
1680     if (status != 0)
1681       return (-1);
1682   }
1683
1684   if (strlen (n->plugin_instance) > 0)
1685   {
1686     status = write_part_string (&buffer_ptr, &buffer_free,
1687         TYPE_PLUGIN_INSTANCE,
1688         n->plugin_instance, strlen (n->plugin_instance));
1689     if (status != 0)
1690       return (-1);
1691   }
1692
1693   if (strlen (n->type) > 0)
1694   {
1695     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE,
1696         n->type, strlen (n->type));
1697     if (status != 0)
1698       return (-1);
1699   }
1700
1701   if (strlen (n->type_instance) > 0)
1702   {
1703     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE,
1704         n->type_instance, strlen (n->type_instance));
1705     if (status != 0)
1706       return (-1);
1707   }
1708
1709   status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE,
1710       n->message, strlen (n->message));
1711   if (status != 0)
1712     return (-1);
1713
1714   network_send_buffer (buffer, sizeof (buffer) - buffer_free);
1715
1716   return (0);
1717 } /* int network_notification */
1718
1719 static int network_shutdown (void)
1720 {
1721         listen_loop++;
1722
1723         /* Kill the listening thread */
1724         if (receive_thread_running != 0)
1725         {
1726                 INFO ("network plugin: Stopping receive thread.");
1727                 pthread_kill (receive_thread_id, SIGTERM);
1728                 pthread_join (receive_thread_id, NULL /* no return value */);
1729                 memset (&receive_thread_id, 0, sizeof (receive_thread_id));
1730                 receive_thread_running = 0;
1731         }
1732
1733         /* Shutdown the dispatching thread */
1734         if (dispatch_thread_running != 0)
1735         {
1736                 INFO ("network plugin: Stopping dispatch thread.");
1737                 pthread_join (dispatch_thread_id, /* ret = */ NULL);
1738                 pthread_cond_broadcast (&receive_list_cond);
1739                 dispatch_thread_running = 0;
1740         }
1741
1742         if (send_buffer_fill > 0)
1743                 flush_buffer ();
1744
1745         if (cache_tree != NULL)
1746         {
1747                 void *key;
1748                 void *value;
1749
1750                 while (c_avl_pick (cache_tree, &key, &value) == 0)
1751                 {
1752                         sfree (key);
1753                         sfree (value);
1754                 }
1755                 c_avl_destroy (cache_tree);
1756                 cache_tree = NULL;
1757         }
1758
1759         /* TODO: Close `sending_sockets' */
1760
1761         plugin_unregister_config ("network");
1762         plugin_unregister_init ("network");
1763         plugin_unregister_write ("network");
1764         plugin_unregister_shutdown ("network");
1765
1766         /* Let the init function do it's move again ;) */
1767         cache_flush_last = 0;
1768
1769         return (0);
1770 } /* int network_shutdown */
1771
1772 static int network_init (void)
1773 {
1774         /* Check if we were already initialized. If so, just return - there's
1775          * nothing more to do (for now, that is). */
1776         if (cache_flush_last != 0)
1777                 return (0);
1778
1779         plugin_register_shutdown ("network", network_shutdown);
1780
1781         send_buffer_ptr  = send_buffer;
1782         send_buffer_fill = 0;
1783         memset (&send_buffer_vl, 0, sizeof (send_buffer_vl));
1784
1785         cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1786         cache_flush_last = time (NULL);
1787
1788         /* setup socket(s) and so on */
1789         if (sending_sockets != NULL)
1790         {
1791                 plugin_register_write ("network", network_write,
1792                                 /* user_data = */ NULL);
1793                 plugin_register_notification ("network", network_notification,
1794                                 /* user_data = */ NULL);
1795         }
1796
1797         /* If no threads need to be started, return here. */
1798         if ((listen_sockets_num == 0)
1799                         || ((dispatch_thread_running != 0)
1800                                 && (receive_thread_running != 0)))
1801                 return (0);
1802
1803         if (dispatch_thread_running == 0)
1804         {
1805                 int status;
1806                 status = pthread_create (&dispatch_thread_id,
1807                                 NULL /* no attributes */,
1808                                 dispatch_thread,
1809                                 NULL /* no argument */);
1810                 if (status != 0)
1811                 {
1812                         char errbuf[1024];
1813                         ERROR ("network: pthread_create failed: %s",
1814                                         sstrerror (errno, errbuf,
1815                                                 sizeof (errbuf)));
1816                 }
1817                 else
1818                 {
1819                         dispatch_thread_running = 1;
1820                 }
1821         }
1822
1823         if (receive_thread_running == 0)
1824         {
1825                 int status;
1826                 status = pthread_create (&receive_thread_id,
1827                                 NULL /* no attributes */,
1828                                 receive_thread,
1829                                 NULL /* no argument */);
1830                 if (status != 0)
1831                 {
1832                         char errbuf[1024];
1833                         ERROR ("network: pthread_create failed: %s",
1834                                         sstrerror (errno, errbuf,
1835                                                 sizeof (errbuf)));
1836                 }
1837                 else
1838                 {
1839                         receive_thread_running = 1;
1840                 }
1841         }
1842
1843         return (0);
1844 } /* int network_init */
1845
1846 /* 
1847  * The flush option of the network plugin cannot flush individual identifiers.
1848  * All the values are added to a buffer and sent when the buffer is full, the
1849  * requested value may or may not be in there, it's not worth finding out. We
1850  * just send the buffer if `flush'  is called - if the requested value was in
1851  * there, good. If not, well, then there is nothing to flush.. -octo
1852  */
1853 static int network_flush (int timeout,
1854                 const char __attribute__((unused)) *identifier,
1855                 user_data_t __attribute__((unused)) *user_data)
1856 {
1857         pthread_mutex_lock (&send_buffer_lock);
1858
1859         if (((time (NULL) - cache_flush_last) >= timeout)
1860                         && (send_buffer_fill > 0))
1861         {
1862                 flush_buffer ();
1863         }
1864
1865         pthread_mutex_unlock (&send_buffer_lock);
1866
1867         return (0);
1868 } /* int network_flush */
1869
1870 void module_register (void)
1871 {
1872         plugin_register_config ("network", network_config,
1873                         config_keys, config_keys_num);
1874         plugin_register_init   ("network", network_init);
1875         plugin_register_flush   ("network", network_flush,
1876                         /* user_data = */ NULL);
1877 } /* void module_register */