libcollectdclient: Introduce a "parser" callback.
[collectd.git] / src / libcollectdclient / server.c
1 /**
2  * Copyright 2017 Florian Forster
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20  * SOFTWARE.
21  *
22  * Authors:
23  *   Florian octo Forster <octo at collectd.org>
24  **/
25
26 #if HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #if !defined(__GNUC__) || !__GNUC__
31 #define __attribute__(x) /**/
32 #endif
33
34 #include "collectd/lcc_features.h"
35 #include "collectd/network_parse.h" /* for lcc_network_parse_options_t */
36 #include "collectd/server.h"
37
38 #include <errno.h>
39 #include <net/if.h>
40 #include <netdb.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <sys/types.h>
44 #include <unistd.h>
45
46 #include <stdio.h>
47 #define DEBUG(...) printf(__VA_ARGS__)
48
49 static _Bool is_multicast(struct addrinfo const *ai) {
50   if (ai->ai_family == AF_INET) {
51     struct sockaddr_in *addr = (struct sockaddr_in *)ai->ai_addr;
52     return IN_MULTICAST(ntohl(addr->sin_addr.s_addr));
53   } else if (ai->ai_family == AF_INET6) {
54     struct sockaddr_in6 *addr = (struct sockaddr_in6 *)ai->ai_addr;
55     return IN6_IS_ADDR_MULTICAST(&addr->sin6_addr);
56   }
57   return 0;
58 }
59
60 static int server_multicast_join(lcc_listener_t *srv,
61                                  struct sockaddr_storage *group, int loop_back,
62                                  int ttl) {
63   if (group->ss_family == AF_INET) {
64     struct sockaddr_in *sa = (struct sockaddr_in *)group;
65
66     int status = setsockopt(srv->conn, IPPROTO_IP, IP_MULTICAST_LOOP,
67                             &loop_back, sizeof(loop_back));
68     if (status == -1) {
69       DEBUG("setsockopt(IP_MULTICAST_LOOP, %d) = %d\n", loop_back, errno);
70       return errno;
71     }
72
73     status =
74         setsockopt(srv->conn, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
75     if (status == -1)
76       return errno;
77
78 #if HAVE_STRUCT_IP_MREQN_IMR_IFINDEX
79     struct ip_mreqn mreq = {
80         .imr_address.s_addr = INADDR_ANY,
81         .imr_multiaddr.s_addr = sa->sin_addr.s_addr,
82         .imr_ifindex = if_nametoindex(srv->interface),
83     };
84 #else
85     struct ip_mreq mreq = {
86         .imr_address.s_addr = INADDR_ANY, .imr_multiaddr.s_addr = sa->s_addr,
87     };
88 #endif
89     status = setsockopt(srv->conn, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
90                         sizeof(mreq));
91     if (status == -1)
92       return errno;
93   } else if (group->ss_family == AF_INET6) {
94     struct sockaddr_in6 *sa = (struct sockaddr_in6 *)group;
95
96     int status = setsockopt(srv->conn, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
97                             &loop_back, sizeof(loop_back));
98     if (status == -1)
99       return errno;
100
101     status = setsockopt(srv->conn, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &ttl,
102                         sizeof(ttl));
103     if (status == -1)
104       return errno;
105
106     struct ipv6_mreq mreq6 = {
107         .ipv6mr_interface = if_nametoindex(srv->interface),
108     };
109     memmove(&mreq6.ipv6mr_multiaddr, &sa->sin6_addr, sizeof(struct in6_addr));
110
111     status = setsockopt(srv->conn, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6,
112                         sizeof(mreq6));
113     if (status == -1)
114       return errno;
115   } else {
116     return EINVAL;
117   }
118
119   return 0;
120 }
121
122 static int server_bind_socket(lcc_listener_t *srv, struct addrinfo const *ai) {
123   /* allow multiple sockets to use the same PORT number */
124   if (setsockopt(srv->conn, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) ==
125       -1) {
126     return errno;
127   }
128
129   if (bind(srv->conn, ai->ai_addr, ai->ai_addrlen) == -1) {
130     return -1;
131   }
132
133   if (is_multicast(ai)) {
134     int status = server_multicast_join(srv, (void *)ai->ai_addr, /* loop = */ 1,
135                                        /* ttl = */ 16);
136     if (status != 0)
137       return status;
138   }
139
140   return 0;
141 }
142
143 static int server_open(lcc_listener_t *srv) {
144   struct addrinfo *res = NULL;
145   int status = getaddrinfo(srv->node ? srv->node : "::",
146                            srv->service ? srv->service : LCC_DEFAULT_PORT,
147                            &(struct addrinfo){
148                                .ai_flags = AI_ADDRCONFIG,
149                                .ai_family = AF_UNSPEC,
150                                .ai_socktype = SOCK_DGRAM,
151                            },
152                            &res);
153   if (status != 0)
154     return status;
155
156   for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
157     srv->conn = socket(ai->ai_family, ai->ai_socktype, 0);
158     if (srv->conn == -1)
159       continue;
160
161     status = server_bind_socket(srv, ai);
162     if (status != 0) {
163       close(srv->conn);
164       srv->conn = -1;
165       continue;
166     }
167
168     break;
169   }
170
171   freeaddrinfo(res);
172
173   if (srv->conn >= 0)
174     return 0;
175   return status != 0 ? status : -1;
176 }
177
178 int lcc_listen_and_write(lcc_listener_t srv) {
179   _Bool close_socket = 0;
180
181   if (srv.conn < 0) {
182     int status = server_open(&srv);
183     if (status != 0)
184       return status;
185     close_socket = 1;
186   }
187
188   if (srv.buffer_size == 0)
189     /* TODO(octo): this should be a define. */
190     srv.buffer_size = 1452;
191
192   if (srv.parser == NULL)
193     srv.parser = lcc_network_parse;
194
195   int ret = 0;
196   while (42) {
197     char buffer[srv.buffer_size];
198     ssize_t len = recv(srv.conn, buffer, sizeof(buffer), /* flags = */ 0);
199     if (len == -1) {
200       ret = errno;
201       break;
202     } else if (len == 0) {
203       break;
204     }
205
206     (void)srv.parser(buffer, (size_t)len,
207                      (lcc_network_parse_options_t){
208                          .writer = srv.writer,
209                          .password_lookup = srv.password_lookup,
210                          .security_level = srv.security_level,
211                      });
212   }
213
214   if (close_socket) {
215     close(srv.conn);
216     srv.conn = -1;
217   }
218
219   return ret;
220 }