Merge branch 'ff/avl-tree'
[collectd.git] / src / network.c
1 /**
2  * collectd - src/network.c
3  * Copyright (C) 2005-2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_debug.h"
27
28 #include "network.h"
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33 #if HAVE_SYS_SOCKET_H
34 # include <sys/socket.h>
35 #endif
36 #if HAVE_NETDB_H
37 # include <netdb.h>
38 #endif
39 #if HAVE_NETINET_IN_H
40 # include <netinet/in.h>
41 #endif
42 #if HAVE_ARPA_INET_H
43 # include <arpa/inet.h>
44 #endif
45 #if HAVE_POLL_H
46 # include <poll.h>
47 #endif
48
49 /* 1500 - 40 - 8  =  Ethernet packet - IPv6 header - UDP header */
50 /* #define BUFF_SIZE 1452 */
51
52 #ifndef IPV6_ADD_MEMBERSHIP
53 # ifdef IPV6_JOIN_GROUP
54 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
55 # else
56 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
57 # endif
58 #endif /* !IP_ADD_MEMBERSHIP */
59
60 #define BUFF_SIZE 1024
61
62 /*
63  * Private data types
64  */
65 typedef struct sockent
66 {
67         int                      fd;
68         struct sockaddr_storage *addr;
69         socklen_t                addrlen;
70         struct sockent          *next;
71 } sockent_t;
72
73 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
74  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
75  * +-------+-----------------------+-------------------------------+
76  * ! Ver.  !                       ! Length                        !
77  * +-------+-----------------------+-------------------------------+
78  */
79 struct part_header_s
80 {
81         uint16_t type;
82         uint16_t length;
83 };
84 typedef struct part_header_s part_header_t;
85
86 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
87  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
88  * +-------------------------------+-------------------------------+
89  * ! Type                          ! Length                        !
90  * +-------------------------------+-------------------------------+
91  * : (Length - 4) Bytes                                            :
92  * +---------------------------------------------------------------+
93  */
94 struct part_string_s
95 {
96         part_header_t *head;
97         char *value;
98 };
99 typedef struct part_string_s part_string_t;
100
101 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
102  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
103  * +-------------------------------+-------------------------------+
104  * ! Type                          ! Length                        !
105  * +-------------------------------+-------------------------------+
106  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
107  * +---------------------------------------------------------------+
108  */
109 struct part_number_s
110 {
111         part_header_t *head;
112         uint64_t *value;
113 };
114 typedef struct part_number_s part_number_t;
115
116 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
117  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
118  * +-------------------------------+-------------------------------+
119  * ! Type                          ! Length                        !
120  * +-------------------------------+---------------+---------------+
121  * ! Num of values                 ! Type0         ! Type1         !
122  * +-------------------------------+---------------+---------------+
123  * ! Value0                                                        !
124  * !                                                               !
125  * +---------------------------------------------------------------+
126  * ! Value1                                                        !
127  * !                                                               !
128  * +---------------------------------------------------------------+
129  */
130 struct part_values_s
131 {
132         part_header_t *head;
133         uint16_t *num_values;
134         uint8_t  *values_types;
135         value_t  *values;
136 };
137 typedef struct part_values_s part_values_t;
138
139 /*
140  * Private variables
141  */
142 static const char *config_keys[] =
143 {
144         "Listen",
145         "Server",
146         "TimeToLive",
147         NULL
148 };
149 static int config_keys_num = 3;
150
151 static int network_config_ttl = 0;
152
153 static sockent_t *sending_sockets = NULL;
154
155 static struct pollfd *listen_sockets = NULL;
156 static int listen_sockets_num = 0;
157 static pthread_t listen_thread = 0;
158 static int listen_loop = 0;
159
160 static char         send_buffer[BUFF_SIZE];
161 static char        *send_buffer_ptr;
162 static int          send_buffer_fill;
163 static value_list_t send_buffer_vl = VALUE_LIST_INIT;
164 static char         send_buffer_type[DATA_MAX_NAME_LEN];
165
166 /*
167  * Private functions
168  */
169 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
170                 const data_set_t *ds, const value_list_t *vl)
171 {
172         part_values_t pv;
173         int i;
174
175         i = 6 + (9 * vl->values_len);
176         if (*ret_buffer_len < i)
177                 return (-1);
178         *ret_buffer_len -= i;
179
180         pv.head = (part_header_t *) *ret_buffer;
181         pv.num_values = (uint16_t *) (pv.head + 1);
182         pv.values_types = (uint8_t *) (pv.num_values + 1);
183         pv.values = (value_t *) (pv.values_types + vl->values_len);
184         *ret_buffer = (void *) (pv.values + vl->values_len);
185
186         pv.head->type = htons (TYPE_VALUES);
187         pv.head->length = htons (6 + (9 * vl->values_len));
188         *pv.num_values = htons ((uint16_t) vl->values_len);
189         
190         for (i = 0; i < vl->values_len; i++)
191         {
192                 if (ds->ds[i].type == DS_TYPE_COUNTER)
193                 {
194                         pv.values_types[i] = DS_TYPE_COUNTER;
195                         pv.values[i].counter = htonll (vl->values[i].counter);
196                 }
197                 else
198                 {
199                         pv.values_types[i] = DS_TYPE_GAUGE;
200                         pv.values[i].gauge = vl->values[i].gauge;
201                 }
202         } /* for (values) */
203
204         return (0);
205 } /* int write_part_values */
206
207 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
208                 int type, uint64_t value)
209 {
210         part_number_t pn;
211
212         if (*ret_buffer_len < 12)
213                 return (-1);
214
215         pn.head = (part_header_t *) *ret_buffer;
216         pn.value = (uint64_t *) (pn.head + 1);
217
218         pn.head->type = htons (type);
219         pn.head->length = htons (12);
220         *pn.value = htonll (value);
221
222         *ret_buffer = (char *) (pn.value + 1);
223         *ret_buffer_len -= 12;
224
225         return (0);
226 } /* int write_part_number */
227
228 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
229                 int type, const char *str, int str_len)
230 {
231         part_string_t ps;
232         int len;
233
234         len = 4 + str_len + 1;
235         if (*ret_buffer_len < len)
236                 return (-1);
237         *ret_buffer_len -= len;
238
239         ps.head = (part_header_t *) *ret_buffer;
240         ps.value = (char *) (ps.head + 1);
241
242         ps.head->type = htons ((uint16_t) type);
243         ps.head->length = htons ((uint16_t) str_len + 5);
244         if (str_len > 0)
245                 memcpy (ps.value, str, str_len);
246         ps.value[str_len] = '\0';
247         *ret_buffer = (void *) (ps.value + (str_len + 1));
248
249         return (0);
250 } /* int write_part_string */
251
252 static int parse_part_values (void **ret_buffer, int *ret_buffer_len,
253                 value_t **ret_values, int *ret_num_values)
254 {
255         char *buffer = *ret_buffer;
256         int   buffer_len = *ret_buffer_len;
257         part_values_t pv;
258         int   i;
259
260         uint16_t h_length;
261         uint16_t h_type;
262         uint16_t h_num;
263
264         if (buffer_len < (15))
265         {
266                 DBG ("packet is too short: buffer_len = %i", buffer_len);
267                 return (-1);
268         }
269
270         pv.head = (part_header_t *) buffer;
271         h_length = ntohs (pv.head->length);
272         h_type = ntohs (pv.head->type);
273
274         assert (h_type == TYPE_VALUES);
275
276         pv.num_values = (uint16_t *) (pv.head + 1);
277         h_num = ntohs (*pv.num_values);
278
279         if (h_num != ((h_length - 6) / 9))
280         {
281                 DBG ("`length' and `num of values' don't match");
282                 return (-1);
283         }
284
285         pv.values_types = (uint8_t *) (pv.num_values + 1);
286         pv.values = (value_t *) (pv.values_types + h_num);
287
288         for (i = 0; i < h_num; i++)
289                 if (pv.values_types[i] == DS_TYPE_COUNTER)
290                         pv.values[i].counter = ntohll (pv.values[i].counter);
291
292         *ret_buffer     = (void *) (pv.values + h_num);
293         *ret_buffer_len = buffer_len - h_length;
294         *ret_num_values = h_num;
295         *ret_values     = pv.values;
296
297         return (0);
298 } /* int parse_part_values */
299
300 static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
301                 uint64_t *value)
302 {
303         part_number_t pn;
304         uint16_t len;
305
306         pn.head = (part_header_t *) *ret_buffer;
307         pn.value = (uint64_t *) (pn.head + 1);
308
309         len = ntohs (pn.head->length);
310         if (len != 12)
311                 return (-1);
312         if (len > *ret_buffer_len)
313                 return (-1);
314         *value = ntohll (*pn.value);
315
316         *ret_buffer = (void *) (pn.value + 1);
317         *ret_buffer_len -= len;
318
319         return (0);
320 } /* int parse_part_number */
321
322 static int parse_part_string (void **ret_buffer, int *ret_buffer_len,
323                 char *output, int output_len)
324 {
325         char *buffer = *ret_buffer;
326         int   buffer_len = *ret_buffer_len;
327         part_string_t ps;
328
329         uint16_t h_length;
330         uint16_t h_type;
331
332         DBG ("ret_buffer = %p; ret_buffer_len = %i; output = %p; output_len = %i;",
333                         *ret_buffer, *ret_buffer_len,
334                         (void *) output, output_len);
335
336         ps.head = (part_header_t *) buffer;
337
338         h_length = ntohs (ps.head->length);
339         h_type = ntohs (ps.head->type);
340
341         DBG ("length = %hu; type = %hu;", h_length, h_type);
342
343         if (buffer_len < h_length)
344         {
345                 DBG ("packet is too short");
346                 return (-1);
347         }
348         assert ((h_type == TYPE_HOST)
349                         || (h_type == TYPE_PLUGIN)
350                         || (h_type == TYPE_PLUGIN_INSTANCE)
351                         || (h_type == TYPE_TYPE)
352                         || (h_type == TYPE_TYPE_INSTANCE));
353
354         ps.value = buffer + 4;
355         if (ps.value[h_length - 5] != '\0')
356         {
357                 DBG ("String does not end with a nullbyte");
358                 return (-1);
359         }
360
361         if (output_len < (h_length - 4))
362         {
363                 DBG ("output buffer is too small");
364                 return (-1);
365         }
366         strcpy (output, ps.value);
367
368         DBG ("output = %s", output);
369
370         *ret_buffer = (void *) (buffer + h_length);
371         *ret_buffer_len = buffer_len - h_length;
372
373         return (0);
374 } /* int parse_part_string */
375
376 static int parse_packet (void *buffer, int buffer_len)
377 {
378         part_header_t *header;
379         int status;
380
381         value_list_t vl = VALUE_LIST_INIT;
382         char type[DATA_MAX_NAME_LEN];
383
384         DBG ("buffer = %p; buffer_len = %i;", buffer, buffer_len);
385
386         memset (&vl, '\0', sizeof (vl));
387         memset (&type, '\0', sizeof (type));
388         status = 0;
389
390         while ((status == 0) && (buffer_len > sizeof (part_header_t)))
391         {
392                 header = (part_header_t *) buffer;
393
394                 if (ntohs (header->length) > buffer_len)
395                         break;
396
397                 if (header->type == htons (TYPE_VALUES))
398                 {
399                         status = parse_part_values (&buffer, &buffer_len,
400                                         &vl.values, &vl.values_len);
401
402                         if (status != 0)
403                         {
404                                 DBG ("parse_part_values failed.");
405                                 break;
406                         }
407
408                         if ((vl.time > 0)
409                                         && (strlen (vl.host) > 0)
410                                         && (strlen (vl.plugin) > 0)
411                                         && (strlen (type) > 0))
412                         {
413                                 DBG ("dispatching values");
414                                 plugin_dispatch_values (type, &vl);
415                         }
416                         else
417                         {
418                                 DBG ("NOT dispatching values");
419                         }
420                 }
421                 else if (header->type == ntohs (TYPE_TIME))
422                 {
423                         uint64_t tmp = 0;
424                         status = parse_part_number (&buffer, &buffer_len, &tmp);
425                         if (status == 0)
426                                 vl.time = (time_t) tmp;
427                 }
428                 else if (header->type == ntohs (TYPE_HOST))
429                 {
430                         status = parse_part_string (&buffer, &buffer_len,
431                                         vl.host, sizeof (vl.host));
432                 }
433                 else if (header->type == ntohs (TYPE_PLUGIN))
434                 {
435                         status = parse_part_string (&buffer, &buffer_len,
436                                         vl.plugin, sizeof (vl.plugin));
437                 }
438                 else if (header->type == ntohs (TYPE_PLUGIN_INSTANCE))
439                 {
440                         status = parse_part_string (&buffer, &buffer_len,
441                                         vl.plugin_instance, sizeof (vl.plugin_instance));
442                 }
443                 else if (header->type == ntohs (TYPE_TYPE))
444                 {
445                         status = parse_part_string (&buffer, &buffer_len,
446                                         type, sizeof (type));
447                 }
448                 else if (header->type == ntohs (TYPE_TYPE_INSTANCE))
449                 {
450                         status = parse_part_string (&buffer, &buffer_len,
451                                         vl.type_instance, sizeof (vl.type_instance));
452                 }
453                 else
454                 {
455                         DBG ("Unknown part type: 0x%0hx", header->type);
456                         buffer = ((char *) buffer) + header->length;
457                 }
458         } /* while (buffer_len > sizeof (part_header_t)) */
459
460         return (0);
461 } /* int parse_packet */
462
463 static void free_sockent (sockent_t *se)
464 {
465         sockent_t *next;
466         while (se != NULL)
467         {
468                 next = se->next;
469                 free (se->addr);
470                 free (se);
471                 se = next;
472         }
473 } /* void free_sockent */
474
475 /*
476  * int network_set_ttl
477  *
478  * Set the `IP_MULTICAST_TTL', `IP_TTL', `IPV6_MULTICAST_HOPS' or
479  * `IPV6_UNICAST_HOPS', depending on which option is applicable.
480  *
481  * The `struct addrinfo' is used to destinguish between unicast and multicast
482  * sockets.
483  */
484 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
485 {
486         if ((network_config_ttl < 1) || (network_config_ttl > 255))
487                 return (-1);
488
489         DBG ("ttl = %i", network_config_ttl);
490
491         if (ai->ai_family == AF_INET)
492         {
493                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
494                 int optname;
495
496                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
497                         optname = IP_MULTICAST_TTL;
498                 else
499                         optname = IP_TTL;
500
501                 if (setsockopt (se->fd, IPPROTO_IP, optname,
502                                         &network_config_ttl,
503                                         sizeof (network_config_ttl)) == -1)
504                 {
505                         syslog (LOG_ERR, "setsockopt: %s", strerror (errno));
506                         return (-1);
507                 }
508         }
509         else if (ai->ai_family == AF_INET6)
510         {
511                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
512                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
513                 int optname;
514
515                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
516                         optname = IPV6_MULTICAST_HOPS;
517                 else
518                         optname = IPV6_UNICAST_HOPS;
519
520                 if (setsockopt (se->fd, IPPROTO_IPV6, optname,
521                                         &network_config_ttl,
522                                         sizeof (network_config_ttl)) == -1)
523                 {
524                         syslog (LOG_ERR, "setsockopt: %s", strerror (errno));
525                         return (-1);
526                 }
527         }
528
529         return (0);
530 } /* int network_set_ttl */
531
532 static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
533 {
534         int loop = 1;
535
536         DBG ("fd = %i; calling `bind'", se->fd);
537
538         if (bind (se->fd, ai->ai_addr, ai->ai_addrlen) == -1)
539         {
540                 syslog (LOG_ERR, "bind: %s", strerror (errno));
541                 return (-1);
542         }
543
544         if (ai->ai_family == AF_INET)
545         {
546                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
547                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
548                 {
549                         struct ip_mreq mreq;
550
551                         DBG ("fd = %i; IPv4 multicast address found", se->fd);
552
553                         mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
554                         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
555
556                         if (setsockopt (se->fd, IPPROTO_IP, IP_MULTICAST_LOOP,
557                                                 &loop, sizeof (loop)) == -1)
558                         {
559                                 syslog (LOG_ERR, "setsockopt: %s", strerror (errno));
560                                 return (-1);
561                         }
562
563                         if (setsockopt (se->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
564                                                 &mreq, sizeof (mreq)) == -1)
565                         {
566                                 syslog (LOG_ERR, "setsockopt: %s", strerror (errno));
567                                 return (-1);
568                         }
569                 }
570         }
571         else if (ai->ai_family == AF_INET6)
572         {
573                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
574                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
575                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
576                 {
577                         struct ipv6_mreq mreq;
578
579                         DBG ("fd = %i; IPv6 multicast address found", se->fd);
580
581                         memcpy (&mreq.ipv6mr_multiaddr,
582                                         &addr->sin6_addr,
583                                         sizeof (addr->sin6_addr));
584
585                         /* http://developer.apple.com/documentation/Darwin/Reference/ManPages/man4/ip6.4.html
586                          * ipv6mr_interface may be set to zeroes to
587                          * choose the default multicast interface or to
588                          * the index of a particular multicast-capable
589                          * interface if the host is multihomed.
590                          * Membership is associ-associated with a
591                          * single interface; programs running on
592                          * multihomed hosts may need to join the same
593                          * group on more than one interface.*/
594                         mreq.ipv6mr_interface = 0;
595
596                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
597                                                 &loop, sizeof (loop)) == -1)
598                         {
599                                 syslog (LOG_ERR, "setsockopt: %s", strerror (errno));
600                                 return (-1);
601                         }
602
603                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
604                                                 &mreq, sizeof (mreq)) == -1)
605                         {
606                                 syslog (LOG_ERR, "setsockopt: %s", strerror (errno));
607                                 return (-1);
608                         }
609                 }
610         }
611
612         return (0);
613 } /* int network_bind_socket */
614
615 static sockent_t *network_create_socket (const char *node,
616                 const char *service,
617                 int listen)
618 {
619         struct addrinfo  ai_hints;
620         struct addrinfo *ai_list, *ai_ptr;
621         int              ai_return;
622
623         sockent_t *se_head = NULL;
624         sockent_t *se_tail = NULL;
625
626         DBG ("node = %s, service = %s", node, service);
627
628         memset (&ai_hints, '\0', sizeof (ai_hints));
629         ai_hints.ai_flags    = 0;
630 #ifdef AI_PASSIVE
631         ai_hints.ai_flags |= AI_PASSIVE;
632 #endif
633 #ifdef AI_ADDRCONFIG
634         ai_hints.ai_flags |= AI_ADDRCONFIG;
635 #endif
636         ai_hints.ai_family   = AF_UNSPEC;
637         ai_hints.ai_socktype = SOCK_DGRAM;
638         ai_hints.ai_protocol = IPPROTO_UDP;
639
640         ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
641         if (ai_return != 0)
642         {
643                 syslog (LOG_ERR, "getaddrinfo (%s, %s): %s",
644                                 (node == NULL) ? "(null)" : node,
645                                 (service == NULL) ? "(null)" : service,
646                                 (ai_return == EAI_SYSTEM)
647                                 ? strerror (errno)
648                                 : gai_strerror (ai_return));
649                 return (NULL);
650         }
651
652         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
653         {
654                 sockent_t *se;
655
656                 if ((se = (sockent_t *) malloc (sizeof (sockent_t))) == NULL)
657                 {
658                         syslog (LOG_EMERG, "malloc: %s", strerror (errno));
659                         continue;
660                 }
661
662                 if ((se->addr = (struct sockaddr_storage *) malloc (sizeof (struct sockaddr_storage))) == NULL)
663                 {
664                         syslog (LOG_EMERG, "malloc: %s", strerror (errno));
665                         free (se);
666                         continue;
667                 }
668
669                 assert (sizeof (struct sockaddr_storage) >= ai_ptr->ai_addrlen);
670                 memset (se->addr, '\0', sizeof (struct sockaddr_storage));
671                 memcpy (se->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
672                 se->addrlen = ai_ptr->ai_addrlen;
673
674                 se->fd   = socket (ai_ptr->ai_family,
675                                 ai_ptr->ai_socktype,
676                                 ai_ptr->ai_protocol);
677                 se->next = NULL;
678
679                 if (se->fd == -1)
680                 {
681                         syslog (LOG_ERR, "socket: %s", strerror (errno));
682                         free (se->addr);
683                         free (se);
684                         continue;
685                 }
686
687                 if (listen != 0)
688                 {
689                         if (network_bind_socket (se, ai_ptr) != 0)
690                         {
691                                 close (se->fd);
692                                 free (se->addr);
693                                 free (se);
694                                 continue;
695                         }
696                 }
697                 else /* listen == 0 */
698                 {
699                         network_set_ttl (se, ai_ptr);
700                 }
701
702                 if (se_tail == NULL)
703                 {
704                         se_head = se;
705                         se_tail = se;
706                 }
707                 else
708                 {
709                         se_tail->next = se;
710                         se_tail = se;
711                 }
712
713                 /* We don't open more than one write-socket per node/service pair.. */
714                 if (listen == 0)
715                         break;
716         }
717
718         freeaddrinfo (ai_list);
719
720         return (se_head);
721 } /* sockent_t *network_create_socket */
722
723 static sockent_t *network_create_default_socket (int listen)
724 {
725         sockent_t *se_ptr  = NULL;
726         sockent_t *se_head = NULL;
727         sockent_t *se_tail = NULL;
728
729         se_ptr = network_create_socket (NET_DEFAULT_V6_ADDR,
730                         NET_DEFAULT_PORT, listen);
731
732         /* Don't send to the same machine in IPv6 and IPv4 if both are available. */
733         if ((listen == 0) && (se_ptr != NULL))
734                 return (se_ptr);
735
736         if (se_ptr != NULL)
737         {
738                 se_head = se_ptr;
739                 se_tail = se_ptr;
740                 while (se_tail->next != NULL)
741                         se_tail = se_tail->next;
742         }
743
744         se_ptr = network_create_socket (NET_DEFAULT_V4_ADDR, NET_DEFAULT_PORT, listen);
745
746         if (se_tail == NULL)
747                 return (se_ptr);
748
749         se_tail->next = se_ptr;
750         return (se_head);
751 } /* sockent_t *network_create_default_socket */
752
753 static int network_add_listen_socket (const char *node, const char *service)
754 {
755         sockent_t *se;
756         sockent_t *se_ptr;
757         int se_num = 0;
758
759         if (service == NULL)
760                 service = NET_DEFAULT_PORT;
761
762         if (node == NULL)
763                 se = network_create_default_socket (1 /* listen == true */);
764         else
765                 se = network_create_socket (node, service, 1 /* listen == true */);
766
767         if (se == NULL)
768                 return (-1);
769
770         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
771                 se_num++;
772
773         listen_sockets = (struct pollfd *) realloc (listen_sockets,
774                         (listen_sockets_num + se_num)
775                         * sizeof (struct pollfd));
776
777         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
778         {
779                 listen_sockets[listen_sockets_num].fd = se_ptr->fd;
780                 listen_sockets[listen_sockets_num].events = POLLIN | POLLPRI;
781                 listen_sockets[listen_sockets_num].revents = 0;
782                 listen_sockets_num++;
783         } /* for (se) */
784
785         free_sockent (se);
786         return (0);
787 } /* int network_add_listen_socket */
788
789 static int network_add_sending_socket (const char *node, const char *service)
790 {
791         sockent_t *se;
792         sockent_t *se_ptr;
793
794         if (service == NULL)
795                 service = NET_DEFAULT_PORT;
796
797         if (node == NULL)
798                 se = network_create_default_socket (0 /* listen == false */);
799         else
800                 se = network_create_socket (node, service, 0 /* listen == false */);
801
802         if (se == NULL)
803                 return (-1);
804
805         if (sending_sockets == NULL)
806         {
807                 sending_sockets = se;
808                 return (0);
809         }
810
811         for (se_ptr = sending_sockets; se_ptr->next != NULL; se_ptr = se_ptr->next)
812                 /* seek end */;
813
814         se_ptr->next = se;
815         return (0);
816 } /* int network_get_listen_socket */
817
818 int network_receive (void)
819 {
820         char buffer[BUFF_SIZE];
821         int  buffer_len;
822
823         int i;
824         int status;
825
826         if (listen_sockets_num == 0)
827                 network_add_listen_socket (NULL, NULL);
828
829         if (listen_sockets_num == 0)
830         {
831                 syslog (LOG_ERR, "network: Failed to open a listening socket.");
832                 return (-1);
833         }
834
835         while (listen_loop == 0)
836         {
837                 status = poll (listen_sockets, listen_sockets_num, -1);
838
839                 if (status <= 0)
840                 {
841                         if (errno == EINTR)
842                                 continue;
843                         syslog (LOG_ERR, "poll failed: %s",
844                                         strerror (errno));
845                         return (-1);
846                 }
847
848                 for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
849                 {
850                         if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
851                                 continue;
852                         status--;
853
854                         buffer_len = recv (listen_sockets[i].fd,
855                                         buffer, sizeof (buffer),
856                                         0 /* no flags */);
857                         if (buffer_len < 0)
858                         {
859                                 syslog (LOG_ERR, "recv failed: %s", strerror (errno));
860                                 return (-1);
861                         }
862
863                         parse_packet (buffer, buffer_len);
864                 } /* for (listen_sockets) */
865         } /* while (listen_loop == 0) */
866
867         return (0);
868 }
869
870 static void *receive_thread (void *arg)
871 {
872         return (network_receive () ? (void *) 1 : (void *) 0);
873 } /* void *receive_thread */
874
875 static void network_send_buffer (const char *buffer, int buffer_len)
876 {
877         sockent_t *se;
878         int status;
879
880         DBG ("buffer_len = %i", buffer_len);
881
882         for (se = sending_sockets; se != NULL; se = se->next)
883         {
884                 while (42)
885                 {
886                         status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */,
887                                         (struct sockaddr *) se->addr, se->addrlen);
888                         if (status < 0)
889                         {
890                                 if (errno == EINTR)
891                                         continue;
892                                 syslog (LOG_ERR, "network plugin: sendto failed: %s",
893                                                 strerror (errno));
894                                 break;
895                         }
896
897                         break;
898                 } /* while (42) */
899         } /* for (sending_sockets) */
900 } /* void network_send_buffer */
901
902 static int add_to_buffer (char *buffer, int buffer_size,
903                 value_list_t *vl_def, char *type_def,
904                 const data_set_t *ds, const value_list_t *vl)
905 {
906         if (strcmp (vl_def->host, vl->host) != 0)
907         {
908                 if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
909                                         vl->host, strlen (vl->host)) != 0)
910                         return (-1);
911                 strcpy (vl_def->host, vl->host);
912                 DBG ("host = %s", vl->host);
913         }
914
915         if (vl_def->time != vl->time)
916         {
917                 if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
918                                         (uint64_t) vl->time))
919                         return (-1);
920                 vl_def->time = vl->time;
921                 DBG ("time = %u", (unsigned int) vl->time);
922         }
923
924         if (strcmp (vl_def->plugin, vl->plugin) != 0)
925         {
926                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
927                                         vl->plugin, strlen (vl->plugin)) != 0)
928                         return (-1);
929                 strcpy (vl_def->plugin, vl->plugin);
930                 DBG ("plugin = %s", vl->plugin);
931         }
932
933         if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
934         {
935                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
936                                         vl->plugin_instance,
937                                         strlen (vl->plugin_instance)) != 0)
938                         return (-1);
939                 strcpy (vl_def->plugin_instance, vl->plugin_instance);
940                 DBG ("plugin_instance = %s", vl->plugin_instance);
941         }
942
943         if (strcmp (type_def, ds->type) != 0)
944         {
945                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
946                                         ds->type, strlen (ds->type)) != 0)
947                         return (-1);
948                 strcpy (type_def, ds->type);
949                 DBG ("type = %s", ds->type);
950         }
951
952         if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
953         {
954                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
955                                         vl->type_instance,
956                                         strlen (vl->type_instance)) != 0)
957                         return (-1);
958                 strcpy (vl_def->type_instance, vl->type_instance);
959                 DBG ("type_instance = %s", vl->type_instance);
960         }
961         
962         if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
963                 return (-1);
964
965         return (buffer_size);
966 } /* int add_to_buffer */
967
968 static void flush_buffer (void)
969 {
970         network_send_buffer (send_buffer, send_buffer_fill);
971         send_buffer_ptr  = send_buffer;
972         send_buffer_fill = 0;
973         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
974         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
975 }
976
977 static int network_write (const data_set_t *ds, const value_list_t *vl)
978 {
979         int status;
980         /* TODO: lock buffer */
981         status = add_to_buffer (send_buffer_ptr,
982                         sizeof (send_buffer) - send_buffer_fill,
983                         &send_buffer_vl, send_buffer_type,
984                         ds, vl);
985         if (status >= 0)
986         {
987                 send_buffer_fill += status;
988                 send_buffer_ptr  += status;
989         }
990         else
991         {
992                 flush_buffer ();
993
994                 status = add_to_buffer (send_buffer_ptr,
995                                 sizeof (send_buffer) - send_buffer_fill,
996                                 &send_buffer_vl, send_buffer_type,
997                                 ds, vl);
998
999                 if (status >= 0)
1000                 {
1001                         send_buffer_fill += status;
1002                         send_buffer_ptr  += status;
1003                 }
1004         }
1005
1006         if (status < 0)
1007         {
1008                 syslog (LOG_ERR, "network plugin: Unable to append to the "
1009                                 "buffer for some weird reason");
1010         }
1011         else if ((sizeof (send_buffer) - send_buffer_fill) < 15)
1012         {
1013                 flush_buffer ();
1014         }
1015         /* TODO: unlock buffer */
1016
1017         return ((status < 0) ? -1 : 0);
1018 } /* int network_write */
1019
1020 static int network_config (const char *key, const char *val)
1021 {
1022         char *node;
1023         char *service;
1024
1025         char *fields[3];
1026         int   fields_num;
1027
1028         if ((strcasecmp ("Listen", key) == 0)
1029                         || (strcasecmp ("Server", key) == 0))
1030         {
1031                 char *val_cpy = strdup (val);
1032                 if (val_cpy == NULL)
1033                         return (1);
1034
1035                 service = NET_DEFAULT_PORT;
1036                 fields_num = strsplit (val_cpy, fields, 3);
1037                 if ((fields_num != 1)
1038                                 && (fields_num != 2))
1039                         return (1);
1040                 else if (fields_num == 2)
1041                         service = fields[1];
1042                 node = fields[0];
1043
1044                 if (strcasecmp ("Listen", key) == 0)
1045                         network_add_listen_socket (node, service);
1046                 else
1047                         network_add_sending_socket (node, service);
1048         }
1049         else if (strcasecmp ("TimeToLive", key) == 0)
1050         {
1051                 int tmp = atoi (val);
1052                 if ((tmp > 0) && (tmp < 256))
1053                         network_config_ttl = tmp;
1054                 else
1055                         return (1);
1056         }
1057         else
1058         {
1059                 return (-1);
1060         }
1061         return (0);
1062 }
1063
1064 static int network_shutdown (void)
1065 {
1066         DBG ("Shutting down.");
1067
1068         listen_loop++;
1069
1070         pthread_kill (listen_thread, SIGTERM);
1071         pthread_join (listen_thread, NULL /* no return value */);
1072
1073         listen_thread = 0;
1074
1075         return (0);
1076 }
1077
1078 static int network_init (void)
1079 {
1080         plugin_register_shutdown ("network", network_shutdown);
1081
1082         send_buffer_ptr  = send_buffer;
1083         send_buffer_fill = 0;
1084         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1085         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1086
1087         /* setup socket(s) and so on */
1088         if (sending_sockets != NULL)
1089                 plugin_register_write ("network", network_write);
1090
1091         if ((listen_sockets_num != 0) && (listen_thread == 0))
1092         {
1093                 int status;
1094
1095                 status = pthread_create (&listen_thread, NULL /* no attributes */,
1096                                 receive_thread, NULL /* no argument */);
1097
1098                 if (status != 0)
1099                         syslog (LOG_ERR, "network: pthread_create failed: %s",
1100                                         strerror (errno));
1101         }
1102         return (0);
1103 } /* int network_init */
1104
1105 void module_register (void)
1106 {
1107         plugin_register_config ("network", network_config,
1108                         config_keys, config_keys_num);
1109         plugin_register_init   ("network", network_init);
1110 }