network plugin: Fix shutdown code.
[collectd.git] / src / network.c
1 /**
2  * collectd - src/network.c
3  * Copyright (C) 2005-2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26
27 #include "network.h"
28
29 #if HAVE_PTHREAD_H
30 # include <pthread.h>
31 #endif
32 #if HAVE_SYS_SOCKET_H
33 # include <sys/socket.h>
34 #endif
35 #if HAVE_NETDB_H
36 # include <netdb.h>
37 #endif
38 #if HAVE_NETINET_IN_H
39 # include <netinet/in.h>
40 #endif
41 #if HAVE_ARPA_INET_H
42 # include <arpa/inet.h>
43 #endif
44 #if HAVE_POLL_H
45 # include <poll.h>
46 #endif
47
48 /* 1500 - 40 - 8  =  Ethernet packet - IPv6 header - UDP header */
49 /* #define BUFF_SIZE 1452 */
50
51 #ifndef IPV6_ADD_MEMBERSHIP
52 # ifdef IPV6_JOIN_GROUP
53 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
54 # else
55 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
56 # endif
57 #endif /* !IP_ADD_MEMBERSHIP */
58
59 #define BUFF_SIZE 1024
60
61 /*
62  * Private data types
63  */
64 typedef struct sockent
65 {
66         int                      fd;
67         struct sockaddr_storage *addr;
68         socklen_t                addrlen;
69         struct sockent          *next;
70 } sockent_t;
71
72 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
73  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
74  * +-------+-----------------------+-------------------------------+
75  * ! Ver.  !                       ! Length                        !
76  * +-------+-----------------------+-------------------------------+
77  */
78 struct part_header_s
79 {
80         uint16_t type;
81         uint16_t length;
82 };
83 typedef struct part_header_s part_header_t;
84
85 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
86  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
87  * +-------------------------------+-------------------------------+
88  * ! Type                          ! Length                        !
89  * +-------------------------------+-------------------------------+
90  * : (Length - 4) Bytes                                            :
91  * +---------------------------------------------------------------+
92  */
93 struct part_string_s
94 {
95         part_header_t *head;
96         char *value;
97 };
98 typedef struct part_string_s part_string_t;
99
100 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
101  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
102  * +-------------------------------+-------------------------------+
103  * ! Type                          ! Length                        !
104  * +-------------------------------+-------------------------------+
105  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
106  * +---------------------------------------------------------------+
107  */
108 struct part_number_s
109 {
110         part_header_t *head;
111         uint64_t *value;
112 };
113 typedef struct part_number_s part_number_t;
114
115 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
116  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
117  * +-------------------------------+-------------------------------+
118  * ! Type                          ! Length                        !
119  * +-------------------------------+---------------+---------------+
120  * ! Num of values                 ! Type0         ! Type1         !
121  * +-------------------------------+---------------+---------------+
122  * ! Value0                                                        !
123  * !                                                               !
124  * +---------------------------------------------------------------+
125  * ! Value1                                                        !
126  * !                                                               !
127  * +---------------------------------------------------------------+
128  */
129 struct part_values_s
130 {
131         part_header_t *head;
132         uint16_t *num_values;
133         uint8_t  *values_types;
134         value_t  *values;
135 };
136 typedef struct part_values_s part_values_t;
137
138 /*
139  * Private variables
140  */
141 static const char *config_keys[] =
142 {
143         "Listen",
144         "Server",
145         "TimeToLive",
146         NULL
147 };
148 static int config_keys_num = 3;
149
150 static int network_config_ttl = 0;
151
152 static sockent_t *sending_sockets = NULL;
153
154 static struct pollfd *listen_sockets = NULL;
155 static int listen_sockets_num = 0;
156 static pthread_t listen_thread = 0;
157 static int listen_loop = 0;
158
159 static char         send_buffer[BUFF_SIZE];
160 static char        *send_buffer_ptr;
161 static int          send_buffer_fill;
162 static value_list_t send_buffer_vl = VALUE_LIST_INIT;
163 static char         send_buffer_type[DATA_MAX_NAME_LEN];
164 static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
165
166 /*
167  * Private functions
168  */
169 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
170                 const data_set_t *ds, const value_list_t *vl)
171 {
172         part_values_t pv;
173         int i;
174
175         i = 6 + (9 * vl->values_len);
176         if (*ret_buffer_len < i)
177                 return (-1);
178         *ret_buffer_len -= i;
179
180         pv.head = (part_header_t *) *ret_buffer;
181         pv.num_values = (uint16_t *) (pv.head + 1);
182         pv.values_types = (uint8_t *) (pv.num_values + 1);
183         pv.values = (value_t *) (pv.values_types + vl->values_len);
184         *ret_buffer = (void *) (pv.values + vl->values_len);
185
186         pv.head->type = htons (TYPE_VALUES);
187         pv.head->length = htons (6 + (9 * vl->values_len));
188         *pv.num_values = htons ((uint16_t) vl->values_len);
189         
190         for (i = 0; i < vl->values_len; i++)
191         {
192                 if (ds->ds[i].type == DS_TYPE_COUNTER)
193                 {
194                         pv.values_types[i] = DS_TYPE_COUNTER;
195                         pv.values[i].counter = htonll (vl->values[i].counter);
196                 }
197                 else
198                 {
199                         pv.values_types[i] = DS_TYPE_GAUGE;
200                         pv.values[i].gauge = vl->values[i].gauge;
201                 }
202         } /* for (values) */
203
204         return (0);
205 } /* int write_part_values */
206
207 static int write_part_number (char **ret_buffer, int *ret_buffer_len,
208                 int type, uint64_t value)
209 {
210         part_number_t pn;
211
212         if (*ret_buffer_len < 12)
213                 return (-1);
214
215         pn.head = (part_header_t *) *ret_buffer;
216         pn.value = (uint64_t *) (pn.head + 1);
217
218         pn.head->type = htons (type);
219         pn.head->length = htons (12);
220         *pn.value = htonll (value);
221
222         *ret_buffer = (char *) (pn.value + 1);
223         *ret_buffer_len -= 12;
224
225         return (0);
226 } /* int write_part_number */
227
228 static int write_part_string (char **ret_buffer, int *ret_buffer_len,
229                 int type, const char *str, int str_len)
230 {
231         part_string_t ps;
232         int len;
233
234         len = 4 + str_len + 1;
235         if (*ret_buffer_len < len)
236                 return (-1);
237         *ret_buffer_len -= len;
238
239         ps.head = (part_header_t *) *ret_buffer;
240         ps.value = (char *) (ps.head + 1);
241
242         ps.head->type = htons ((uint16_t) type);
243         ps.head->length = htons ((uint16_t) str_len + 5);
244         if (str_len > 0)
245                 memcpy (ps.value, str, str_len);
246         ps.value[str_len] = '\0';
247         *ret_buffer = (void *) (ps.value + (str_len + 1));
248
249         return (0);
250 } /* int write_part_string */
251
252 static int parse_part_values (void **ret_buffer, int *ret_buffer_len,
253                 value_t **ret_values, int *ret_num_values)
254 {
255         char *buffer = *ret_buffer;
256         int   buffer_len = *ret_buffer_len;
257         part_values_t pv;
258         int   i;
259
260         uint16_t h_length;
261         uint16_t h_type;
262         uint16_t h_num;
263
264         if (buffer_len < (15))
265         {
266                 DEBUG ("packet is too short: buffer_len = %i", buffer_len);
267                 return (-1);
268         }
269
270         pv.head = (part_header_t *) buffer;
271         h_length = ntohs (pv.head->length);
272         h_type = ntohs (pv.head->type);
273
274         assert (h_type == TYPE_VALUES);
275
276         pv.num_values = (uint16_t *) (pv.head + 1);
277         h_num = ntohs (*pv.num_values);
278
279         if (h_num != ((h_length - 6) / 9))
280         {
281                 DEBUG ("`length' and `num of values' don't match");
282                 return (-1);
283         }
284
285         pv.values_types = (uint8_t *) (pv.num_values + 1);
286         pv.values = (value_t *) (pv.values_types + h_num);
287
288         for (i = 0; i < h_num; i++)
289                 if (pv.values_types[i] == DS_TYPE_COUNTER)
290                         pv.values[i].counter = ntohll (pv.values[i].counter);
291
292         *ret_buffer     = (void *) (pv.values + h_num);
293         *ret_buffer_len = buffer_len - h_length;
294         *ret_num_values = h_num;
295         *ret_values     = pv.values;
296
297         return (0);
298 } /* int parse_part_values */
299
300 static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
301                 uint64_t *value)
302 {
303         part_number_t pn;
304         uint16_t len;
305
306         pn.head = (part_header_t *) *ret_buffer;
307         pn.value = (uint64_t *) (pn.head + 1);
308
309         len = ntohs (pn.head->length);
310         if (len != 12)
311                 return (-1);
312         if (len > *ret_buffer_len)
313                 return (-1);
314         *value = ntohll (*pn.value);
315
316         *ret_buffer = (void *) (pn.value + 1);
317         *ret_buffer_len -= len;
318
319         return (0);
320 } /* int parse_part_number */
321
322 static int parse_part_string (void **ret_buffer, int *ret_buffer_len,
323                 char *output, int output_len)
324 {
325         char *buffer = *ret_buffer;
326         int   buffer_len = *ret_buffer_len;
327         part_string_t ps;
328
329         uint16_t h_length;
330         uint16_t h_type;
331
332         DEBUG ("network plugin: parse_part_string: ret_buffer = %p;"
333                         " ret_buffer_len = %i; output = %p; output_len = %i;",
334                         *ret_buffer, *ret_buffer_len,
335                         (void *) output, output_len);
336
337         ps.head = (part_header_t *) buffer;
338
339         h_length = ntohs (ps.head->length);
340         h_type = ntohs (ps.head->type);
341
342         DEBUG ("network plugin: parse_part_string: length = %hu; type = %hu;",
343                         h_length, h_type);
344
345         if (buffer_len < h_length)
346         {
347                 DEBUG ("packet is too short");
348                 return (-1);
349         }
350         assert ((h_type == TYPE_HOST)
351                         || (h_type == TYPE_PLUGIN)
352                         || (h_type == TYPE_PLUGIN_INSTANCE)
353                         || (h_type == TYPE_TYPE)
354                         || (h_type == TYPE_TYPE_INSTANCE));
355
356         ps.value = buffer + 4;
357         if (ps.value[h_length - 5] != '\0')
358         {
359                 DEBUG ("String does not end with a nullbyte");
360                 return (-1);
361         }
362
363         if (output_len < (h_length - 4))
364         {
365                 DEBUG ("output buffer is too small");
366                 return (-1);
367         }
368         strcpy (output, ps.value);
369
370         DEBUG ("network plugin: parse_part_string: output = %s", output);
371
372         *ret_buffer = (void *) (buffer + h_length);
373         *ret_buffer_len = buffer_len - h_length;
374
375         return (0);
376 } /* int parse_part_string */
377
378 static int parse_packet (void *buffer, int buffer_len)
379 {
380         part_header_t *header;
381         int status;
382
383         value_list_t vl = VALUE_LIST_INIT;
384         char type[DATA_MAX_NAME_LEN];
385
386         DEBUG ("buffer = %p; buffer_len = %i;", buffer, buffer_len);
387
388         memset (&vl, '\0', sizeof (vl));
389         memset (&type, '\0', sizeof (type));
390         status = 0;
391
392         while ((status == 0) && (buffer_len > sizeof (part_header_t)))
393         {
394                 header = (part_header_t *) buffer;
395
396                 if (ntohs (header->length) > buffer_len)
397                         break;
398                 /* Assure that this loop terminates eventually */
399                 if (ntohs (header->length) < 4)
400                         break;
401
402                 if (ntohs (header->type) == TYPE_VALUES)
403                 {
404                         status = parse_part_values (&buffer, &buffer_len,
405                                         &vl.values, &vl.values_len);
406
407                         if (status != 0)
408                         {
409                                 DEBUG ("parse_part_values failed.");
410                                 break;
411                         }
412
413                         if ((vl.time > 0)
414                                         && (strlen (vl.host) > 0)
415                                         && (strlen (vl.plugin) > 0)
416                                         && (strlen (type) > 0))
417                         {
418                                 DEBUG ("dispatching values");
419                                 plugin_dispatch_values (type, &vl);
420                         }
421                         else
422                         {
423                                 DEBUG ("NOT dispatching values");
424                         }
425                 }
426                 else if (ntohs (header->type) == TYPE_TIME)
427                 {
428                         uint64_t tmp = 0;
429                         status = parse_part_number (&buffer, &buffer_len, &tmp);
430                         if (status == 0)
431                                 vl.time = (time_t) tmp;
432                 }
433                 else if (ntohs (header->type) == TYPE_HOST)
434                 {
435                         status = parse_part_string (&buffer, &buffer_len,
436                                         vl.host, sizeof (vl.host));
437                         DEBUG ("network plugin: parse_packet: vl.host = %s", vl.host);
438                 }
439                 else if (ntohs (header->type) == TYPE_PLUGIN)
440                 {
441                         status = parse_part_string (&buffer, &buffer_len,
442                                         vl.plugin, sizeof (vl.plugin));
443                         DEBUG ("network plugin: parse_packet: vl.plugin = %s", vl.plugin);
444                 }
445                 else if (ntohs (header->type) == TYPE_PLUGIN_INSTANCE)
446                 {
447                         status = parse_part_string (&buffer, &buffer_len,
448                                         vl.plugin_instance, sizeof (vl.plugin_instance));
449                         DEBUG ("network plugin: parse_packet: vl.plugin_instance = %s", vl.plugin_instance);
450                 }
451                 else if (ntohs (header->type) == TYPE_TYPE)
452                 {
453                         status = parse_part_string (&buffer, &buffer_len,
454                                         type, sizeof (type));
455                         DEBUG ("network plugin: parse_packet: type = %s", type);
456                 }
457                 else if (ntohs (header->type) == TYPE_TYPE_INSTANCE)
458                 {
459                         status = parse_part_string (&buffer, &buffer_len,
460                                         vl.type_instance, sizeof (vl.type_instance));
461                         DEBUG ("network type: parse_packet: vl.type_instance = %s", vl.type_instance);
462                 }
463                 else
464                 {
465                         DEBUG ("Unknown part type: 0x%0hx", ntohs (header->type));
466                         buffer = ((char *) buffer) + ntohs (header->length);
467                 }
468         } /* while (buffer_len > sizeof (part_header_t)) */
469
470         return (0);
471 } /* int parse_packet */
472
473 static void free_sockent (sockent_t *se)
474 {
475         sockent_t *next;
476         while (se != NULL)
477         {
478                 next = se->next;
479                 free (se->addr);
480                 free (se);
481                 se = next;
482         }
483 } /* void free_sockent */
484
485 /*
486  * int network_set_ttl
487  *
488  * Set the `IP_MULTICAST_TTL', `IP_TTL', `IPV6_MULTICAST_HOPS' or
489  * `IPV6_UNICAST_HOPS', depending on which option is applicable.
490  *
491  * The `struct addrinfo' is used to destinguish between unicast and multicast
492  * sockets.
493  */
494 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
495 {
496         if ((network_config_ttl < 1) || (network_config_ttl > 255))
497                 return (-1);
498
499         DEBUG ("ttl = %i", network_config_ttl);
500
501         if (ai->ai_family == AF_INET)
502         {
503                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
504                 int optname;
505
506                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
507                         optname = IP_MULTICAST_TTL;
508                 else
509                         optname = IP_TTL;
510
511                 if (setsockopt (se->fd, IPPROTO_IP, optname,
512                                         &network_config_ttl,
513                                         sizeof (network_config_ttl)) == -1)
514                 {
515                         char errbuf[1024];
516                         ERROR ("setsockopt: %s",
517                                         sstrerror (errno, errbuf, sizeof (errbuf)));
518                         return (-1);
519                 }
520         }
521         else if (ai->ai_family == AF_INET6)
522         {
523                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
524                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
525                 int optname;
526
527                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
528                         optname = IPV6_MULTICAST_HOPS;
529                 else
530                         optname = IPV6_UNICAST_HOPS;
531
532                 if (setsockopt (se->fd, IPPROTO_IPV6, optname,
533                                         &network_config_ttl,
534                                         sizeof (network_config_ttl)) == -1)
535                 {
536                         char errbuf[1024];
537                         ERROR ("setsockopt: %s",
538                                         sstrerror (errno, errbuf,
539                                                 sizeof (errbuf)));
540                         return (-1);
541                 }
542         }
543
544         return (0);
545 } /* int network_set_ttl */
546
547 static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
548 {
549         int loop = 1;
550
551         DEBUG ("fd = %i; calling `bind'", se->fd);
552
553         if (bind (se->fd, ai->ai_addr, ai->ai_addrlen) == -1)
554         {
555                 char errbuf[1024];
556                 ERROR ("bind: %s",
557                                 sstrerror (errno, errbuf, sizeof (errbuf)));
558                 return (-1);
559         }
560
561         if (ai->ai_family == AF_INET)
562         {
563                 struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
564                 if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
565                 {
566                         struct ip_mreq mreq;
567
568                         DEBUG ("fd = %i; IPv4 multicast address found", se->fd);
569
570                         mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
571                         mreq.imr_interface.s_addr = htonl (INADDR_ANY);
572
573                         if (setsockopt (se->fd, IPPROTO_IP, IP_MULTICAST_LOOP,
574                                                 &loop, sizeof (loop)) == -1)
575                         {
576                                 char errbuf[1024];
577                                 ERROR ("setsockopt: %s",
578                                                 sstrerror (errno, errbuf,
579                                                         sizeof (errbuf)));
580                                 return (-1);
581                         }
582
583                         if (setsockopt (se->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
584                                                 &mreq, sizeof (mreq)) == -1)
585                         {
586                                 char errbuf[1024];
587                                 ERROR ("setsockopt: %s",
588                                                 sstrerror (errno, errbuf,
589                                                         sizeof (errbuf)));
590                                 return (-1);
591                         }
592                 }
593         }
594         else if (ai->ai_family == AF_INET6)
595         {
596                 /* Useful example: http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
597                 struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr;
598                 if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
599                 {
600                         struct ipv6_mreq mreq;
601
602                         DEBUG ("fd = %i; IPv6 multicast address found", se->fd);
603
604                         memcpy (&mreq.ipv6mr_multiaddr,
605                                         &addr->sin6_addr,
606                                         sizeof (addr->sin6_addr));
607
608                         /* http://developer.apple.com/documentation/Darwin/Reference/ManPages/man4/ip6.4.html
609                          * ipv6mr_interface may be set to zeroes to
610                          * choose the default multicast interface or to
611                          * the index of a particular multicast-capable
612                          * interface if the host is multihomed.
613                          * Membership is associ-associated with a
614                          * single interface; programs running on
615                          * multihomed hosts may need to join the same
616                          * group on more than one interface.*/
617                         mreq.ipv6mr_interface = 0;
618
619                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
620                                                 &loop, sizeof (loop)) == -1)
621                         {
622                                 char errbuf[1024];
623                                 ERROR ("setsockopt: %s",
624                                                 sstrerror (errno, errbuf,
625                                                         sizeof (errbuf)));
626                                 return (-1);
627                         }
628
629                         if (setsockopt (se->fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
630                                                 &mreq, sizeof (mreq)) == -1)
631                         {
632                                 char errbuf[1024];
633                                 ERROR ("setsockopt: %s",
634                                                 sstrerror (errno, errbuf,
635                                                         sizeof (errbuf)));
636                                 return (-1);
637                         }
638                 }
639         }
640
641         return (0);
642 } /* int network_bind_socket */
643
644 static sockent_t *network_create_socket (const char *node,
645                 const char *service,
646                 int listen)
647 {
648         struct addrinfo  ai_hints;
649         struct addrinfo *ai_list, *ai_ptr;
650         int              ai_return;
651
652         sockent_t *se_head = NULL;
653         sockent_t *se_tail = NULL;
654
655         DEBUG ("node = %s, service = %s", node, service);
656
657         memset (&ai_hints, '\0', sizeof (ai_hints));
658         ai_hints.ai_flags    = 0;
659 #ifdef AI_PASSIVE
660         ai_hints.ai_flags |= AI_PASSIVE;
661 #endif
662 #ifdef AI_ADDRCONFIG
663         ai_hints.ai_flags |= AI_ADDRCONFIG;
664 #endif
665         ai_hints.ai_family   = AF_UNSPEC;
666         ai_hints.ai_socktype = SOCK_DGRAM;
667         ai_hints.ai_protocol = IPPROTO_UDP;
668
669         ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
670         if (ai_return != 0)
671         {
672                 char errbuf[1024];
673                 ERROR ("getaddrinfo (%s, %s): %s",
674                                 (node == NULL) ? "(null)" : node,
675                                 (service == NULL) ? "(null)" : service,
676                                 (ai_return == EAI_SYSTEM)
677                                 ? sstrerror (errno, errbuf, sizeof (errbuf))
678                                 : gai_strerror (ai_return));
679                 return (NULL);
680         }
681
682         for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
683         {
684                 sockent_t *se;
685
686                 if ((se = (sockent_t *) malloc (sizeof (sockent_t))) == NULL)
687                 {
688                         char errbuf[1024];
689                         ERROR ("malloc: %s",
690                                         sstrerror (errno, errbuf,
691                                                 sizeof (errbuf)));
692                         continue;
693                 }
694
695                 if ((se->addr = (struct sockaddr_storage *) malloc (sizeof (struct sockaddr_storage))) == NULL)
696                 {
697                         char errbuf[1024];
698                         ERROR ("malloc: %s",
699                                         sstrerror (errno, errbuf,
700                                                 sizeof (errbuf)));
701                         free (se);
702                         continue;
703                 }
704
705                 assert (sizeof (struct sockaddr_storage) >= ai_ptr->ai_addrlen);
706                 memset (se->addr, '\0', sizeof (struct sockaddr_storage));
707                 memcpy (se->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
708                 se->addrlen = ai_ptr->ai_addrlen;
709
710                 se->fd   = socket (ai_ptr->ai_family,
711                                 ai_ptr->ai_socktype,
712                                 ai_ptr->ai_protocol);
713                 se->next = NULL;
714
715                 if (se->fd == -1)
716                 {
717                         char errbuf[1024];
718                         ERROR ("socket: %s",
719                                         sstrerror (errno, errbuf,
720                                                 sizeof (errbuf)));
721                         free (se->addr);
722                         free (se);
723                         continue;
724                 }
725
726                 if (listen != 0)
727                 {
728                         if (network_bind_socket (se, ai_ptr) != 0)
729                         {
730                                 close (se->fd);
731                                 free (se->addr);
732                                 free (se);
733                                 continue;
734                         }
735                 }
736                 else /* listen == 0 */
737                 {
738                         network_set_ttl (se, ai_ptr);
739                 }
740
741                 if (se_tail == NULL)
742                 {
743                         se_head = se;
744                         se_tail = se;
745                 }
746                 else
747                 {
748                         se_tail->next = se;
749                         se_tail = se;
750                 }
751
752                 /* We don't open more than one write-socket per node/service pair.. */
753                 if (listen == 0)
754                         break;
755         }
756
757         freeaddrinfo (ai_list);
758
759         return (se_head);
760 } /* sockent_t *network_create_socket */
761
762 static sockent_t *network_create_default_socket (int listen)
763 {
764         sockent_t *se_ptr  = NULL;
765         sockent_t *se_head = NULL;
766         sockent_t *se_tail = NULL;
767
768         se_ptr = network_create_socket (NET_DEFAULT_V6_ADDR,
769                         NET_DEFAULT_PORT, listen);
770
771         /* Don't send to the same machine in IPv6 and IPv4 if both are available. */
772         if ((listen == 0) && (se_ptr != NULL))
773                 return (se_ptr);
774
775         if (se_ptr != NULL)
776         {
777                 se_head = se_ptr;
778                 se_tail = se_ptr;
779                 while (se_tail->next != NULL)
780                         se_tail = se_tail->next;
781         }
782
783         se_ptr = network_create_socket (NET_DEFAULT_V4_ADDR, NET_DEFAULT_PORT, listen);
784
785         if (se_tail == NULL)
786                 return (se_ptr);
787
788         se_tail->next = se_ptr;
789         return (se_head);
790 } /* sockent_t *network_create_default_socket */
791
792 static int network_add_listen_socket (const char *node, const char *service)
793 {
794         sockent_t *se;
795         sockent_t *se_ptr;
796         int se_num = 0;
797
798         if (service == NULL)
799                 service = NET_DEFAULT_PORT;
800
801         if (node == NULL)
802                 se = network_create_default_socket (1 /* listen == true */);
803         else
804                 se = network_create_socket (node, service, 1 /* listen == true */);
805
806         if (se == NULL)
807                 return (-1);
808
809         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
810                 se_num++;
811
812         listen_sockets = (struct pollfd *) realloc (listen_sockets,
813                         (listen_sockets_num + se_num)
814                         * sizeof (struct pollfd));
815
816         for (se_ptr = se; se_ptr != NULL; se_ptr = se_ptr->next)
817         {
818                 listen_sockets[listen_sockets_num].fd = se_ptr->fd;
819                 listen_sockets[listen_sockets_num].events = POLLIN | POLLPRI;
820                 listen_sockets[listen_sockets_num].revents = 0;
821                 listen_sockets_num++;
822         } /* for (se) */
823
824         free_sockent (se);
825         return (0);
826 } /* int network_add_listen_socket */
827
828 static int network_add_sending_socket (const char *node, const char *service)
829 {
830         sockent_t *se;
831         sockent_t *se_ptr;
832
833         if (service == NULL)
834                 service = NET_DEFAULT_PORT;
835
836         if (node == NULL)
837                 se = network_create_default_socket (0 /* listen == false */);
838         else
839                 se = network_create_socket (node, service, 0 /* listen == false */);
840
841         if (se == NULL)
842                 return (-1);
843
844         if (sending_sockets == NULL)
845         {
846                 sending_sockets = se;
847                 return (0);
848         }
849
850         for (se_ptr = sending_sockets; se_ptr->next != NULL; se_ptr = se_ptr->next)
851                 /* seek end */;
852
853         se_ptr->next = se;
854         return (0);
855 } /* int network_get_listen_socket */
856
857 int network_receive (void)
858 {
859         char buffer[BUFF_SIZE];
860         int  buffer_len;
861
862         int i;
863         int status;
864
865         if (listen_sockets_num == 0)
866                 network_add_listen_socket (NULL, NULL);
867
868         if (listen_sockets_num == 0)
869         {
870                 ERROR ("network: Failed to open a listening socket.");
871                 return (-1);
872         }
873
874         while (listen_loop == 0)
875         {
876                 status = poll (listen_sockets, listen_sockets_num, -1);
877
878                 if (status <= 0)
879                 {
880                         char errbuf[1024];
881                         if (errno == EINTR)
882                                 continue;
883                         ERROR ("poll failed: %s",
884                                         sstrerror (errno, errbuf, sizeof (errbuf)));
885                         return (-1);
886                 }
887
888                 for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
889                 {
890                         if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
891                                 continue;
892                         status--;
893
894                         buffer_len = recv (listen_sockets[i].fd,
895                                         buffer, sizeof (buffer),
896                                         0 /* no flags */);
897                         if (buffer_len < 0)
898                         {
899                                 char errbuf[1024];
900                                 ERROR ("recv failed: %s",
901                                                 sstrerror (errno, errbuf,
902                                                         sizeof (errbuf)));
903                                 return (-1);
904                         }
905
906                         parse_packet (buffer, buffer_len);
907                 } /* for (listen_sockets) */
908         } /* while (listen_loop == 0) */
909
910         return (0);
911 }
912
913 static void *receive_thread (void *arg)
914 {
915         return (network_receive () ? (void *) 1 : (void *) 0);
916 } /* void *receive_thread */
917
918 static void network_send_buffer (const char *buffer, int buffer_len)
919 {
920         sockent_t *se;
921         int status;
922
923         DEBUG ("buffer_len = %i", buffer_len);
924
925         for (se = sending_sockets; se != NULL; se = se->next)
926         {
927                 while (42)
928                 {
929                         status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */,
930                                         (struct sockaddr *) se->addr, se->addrlen);
931                         if (status < 0)
932                         {
933                                 char errbuf[1024];
934                                 if (errno == EINTR)
935                                         continue;
936                                 ERROR ("network plugin: sendto failed: %s",
937                                                 sstrerror (errno, errbuf,
938                                                         sizeof (errbuf)));
939                                 break;
940                         }
941
942                         break;
943                 } /* while (42) */
944         } /* for (sending_sockets) */
945 } /* void network_send_buffer */
946
947 static int add_to_buffer (char *buffer, int buffer_size,
948                 value_list_t *vl_def, char *type_def,
949                 const data_set_t *ds, const value_list_t *vl)
950 {
951         char *buffer_orig = buffer;
952
953         if (strcmp (vl_def->host, vl->host) != 0)
954         {
955                 if (write_part_string (&buffer, &buffer_size, TYPE_HOST,
956                                         vl->host, strlen (vl->host)) != 0)
957                         return (-1);
958                 strcpy (vl_def->host, vl->host);
959                 DEBUG ("host = %s", vl->host);
960         }
961
962         if (vl_def->time != vl->time)
963         {
964                 if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
965                                         (uint64_t) vl->time))
966                         return (-1);
967                 vl_def->time = vl->time;
968                 DEBUG ("time = %u", (unsigned int) vl->time);
969         }
970
971         if (strcmp (vl_def->plugin, vl->plugin) != 0)
972         {
973                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN,
974                                         vl->plugin, strlen (vl->plugin)) != 0)
975                         return (-1);
976                 strcpy (vl_def->plugin, vl->plugin);
977                 DEBUG ("plugin = %s", vl->plugin);
978         }
979
980         if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
981         {
982                 if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE,
983                                         vl->plugin_instance,
984                                         strlen (vl->plugin_instance)) != 0)
985                         return (-1);
986                 strcpy (vl_def->plugin_instance, vl->plugin_instance);
987                 DEBUG ("plugin_instance = %s", vl->plugin_instance);
988         }
989
990         if (strcmp (type_def, ds->type) != 0)
991         {
992                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE,
993                                         ds->type, strlen (ds->type)) != 0)
994                         return (-1);
995                 strcpy (type_def, ds->type);
996                 DEBUG ("type = %s", ds->type);
997         }
998
999         if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
1000         {
1001                 if (write_part_string (&buffer, &buffer_size, TYPE_TYPE_INSTANCE,
1002                                         vl->type_instance,
1003                                         strlen (vl->type_instance)) != 0)
1004                         return (-1);
1005                 strcpy (vl_def->type_instance, vl->type_instance);
1006                 DEBUG ("type_instance = %s", vl->type_instance);
1007         }
1008         
1009         if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
1010                 return (-1);
1011
1012         return (buffer - buffer_orig);
1013 } /* int add_to_buffer */
1014
1015 static void flush_buffer (void)
1016 {
1017         network_send_buffer (send_buffer, send_buffer_fill);
1018         send_buffer_ptr  = send_buffer;
1019         send_buffer_fill = 0;
1020         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1021         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1022 }
1023
1024 static int network_write (const data_set_t *ds, const value_list_t *vl)
1025 {
1026         int status;
1027
1028         pthread_mutex_lock (&send_buffer_lock);
1029
1030         status = add_to_buffer (send_buffer_ptr,
1031                         sizeof (send_buffer) - send_buffer_fill,
1032                         &send_buffer_vl, send_buffer_type,
1033                         ds, vl);
1034         if (status >= 0)
1035         {
1036                 /* status == bytes added to the buffer */
1037                 send_buffer_fill += status;
1038                 send_buffer_ptr  += status;
1039         }
1040         else
1041         {
1042                 flush_buffer ();
1043
1044                 status = add_to_buffer (send_buffer_ptr,
1045                                 sizeof (send_buffer) - send_buffer_fill,
1046                                 &send_buffer_vl, send_buffer_type,
1047                                 ds, vl);
1048
1049                 if (status >= 0)
1050                 {
1051                         send_buffer_fill += status;
1052                         send_buffer_ptr  += status;
1053                 }
1054         }
1055
1056         if (status < 0)
1057         {
1058                 ERROR ("network plugin: Unable to append to the "
1059                                 "buffer for some weird reason");
1060         }
1061         else if ((sizeof (send_buffer) - send_buffer_fill) < 15)
1062         {
1063                 flush_buffer ();
1064         }
1065
1066         pthread_mutex_unlock (&send_buffer_lock);
1067
1068         return ((status < 0) ? -1 : 0);
1069 } /* int network_write */
1070
1071 static int network_config (const char *key, const char *val)
1072 {
1073         char *node;
1074         char *service;
1075
1076         char *fields[3];
1077         int   fields_num;
1078
1079         if ((strcasecmp ("Listen", key) == 0)
1080                         || (strcasecmp ("Server", key) == 0))
1081         {
1082                 char *val_cpy = strdup (val);
1083                 if (val_cpy == NULL)
1084                         return (1);
1085
1086                 service = NET_DEFAULT_PORT;
1087                 fields_num = strsplit (val_cpy, fields, 3);
1088                 if ((fields_num != 1)
1089                                 && (fields_num != 2))
1090                         return (1);
1091                 else if (fields_num == 2)
1092                         service = fields[1];
1093                 node = fields[0];
1094
1095                 if (strcasecmp ("Listen", key) == 0)
1096                         network_add_listen_socket (node, service);
1097                 else
1098                         network_add_sending_socket (node, service);
1099         }
1100         else if (strcasecmp ("TimeToLive", key) == 0)
1101         {
1102                 int tmp = atoi (val);
1103                 if ((tmp > 0) && (tmp < 256))
1104                         network_config_ttl = tmp;
1105                 else
1106                         return (1);
1107         }
1108         else
1109         {
1110                 return (-1);
1111         }
1112         return (0);
1113 }
1114
1115 static int network_shutdown (void)
1116 {
1117         DEBUG ("Shutting down.");
1118
1119         listen_loop++;
1120
1121         if (listen_thread != (pthread_t) 0)
1122         {
1123                 pthread_kill (listen_thread, SIGTERM);
1124                 pthread_join (listen_thread, NULL /* no return value */);
1125                 listen_thread = (pthread_t) 0;
1126         }
1127
1128         listen_thread = 0;
1129
1130         /* TODO: Close `sending_sockets' */
1131
1132         plugin_unregister_config ("network");
1133         plugin_unregister_init ("network");
1134         plugin_unregister_write ("network");
1135         plugin_unregister_shutdown ("network");
1136
1137         return (0);
1138 } /* int network_shutdown */
1139
1140 static int network_init (void)
1141 {
1142         plugin_register_shutdown ("network", network_shutdown);
1143
1144         send_buffer_ptr  = send_buffer;
1145         send_buffer_fill = 0;
1146         memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
1147         memset (send_buffer_type, '\0', sizeof (send_buffer_type));
1148
1149         /* setup socket(s) and so on */
1150         if (sending_sockets != NULL)
1151                 plugin_register_write ("network", network_write);
1152
1153         if ((listen_sockets_num != 0) && (listen_thread == 0))
1154         {
1155                 int status;
1156
1157                 status = pthread_create (&listen_thread, NULL /* no attributes */,
1158                                 receive_thread, NULL /* no argument */);
1159
1160                 if (status != 0)
1161                 {
1162                         char errbuf[1024];
1163                         ERROR ("network: pthread_create failed: %s",
1164                                         sstrerror (errno, errbuf,
1165                                                 sizeof (errbuf)));
1166                 }
1167         }
1168         return (0);
1169 } /* int network_init */
1170
1171 void module_register (void)
1172 {
1173         plugin_register_config ("network", network_config,
1174                         config_keys, config_keys_num);
1175         plugin_register_init   ("network", network_init);
1176 }