Merge remote-tracking branch 'github/pr/1962'
[collectd.git] / src / zookeeper.c
1 /**
2  * collectd - src/zookeeper.c
3  * Copyright (C) 2014       Google, Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Jeremy Katz <jeremy at katzbox.net>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31
32 #include <netdb.h>
33 #include <sys/un.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36
37 #define ZOOKEEPER_DEF_HOST "127.0.0.1"
38 #define ZOOKEEPER_DEF_PORT "2181"
39
40 static char *zk_host = NULL;
41 static char *zk_port = NULL;
42
43 static const char *config_keys[] =
44 {
45         "Host",
46         "Port"
47 };
48 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
49
50 static int zookeeper_config(const char *key, const char *value)
51 {
52         if (strncmp(key, "Host", strlen("Host")) == 0)
53         {
54                 sfree (zk_host);
55                 zk_host = strdup (value);
56         }
57         else if (strncmp(key, "Port", strlen("Port")) == 0)
58         {
59                 sfree (zk_port);
60                 zk_port = strdup (value);
61         }
62         else
63         {
64                 return -1;
65         }
66         return 0;
67 }
68
69 static void zookeeper_submit_gauge (const char * type, const char * type_inst, gauge_t value)
70 {
71         value_list_t vl = VALUE_LIST_INIT;
72
73         vl.values = &(value_t) { .gauge = value };
74         vl.values_len = 1;
75         sstrncpy (vl.plugin, "zookeeper", sizeof (vl.plugin));
76         sstrncpy (vl.type, type, sizeof (vl.type));
77         if (type_inst != NULL)
78                 sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
79
80         plugin_dispatch_values (&vl);
81 } /* zookeeper_submit_gauge */
82
83 static void zookeeper_submit_derive (const char * type, const char * type_inst, derive_t value)
84 {
85         value_list_t vl = VALUE_LIST_INIT;
86
87         vl.values = &(value_t) { .derive = value };
88         vl.values_len = 1;
89         sstrncpy (vl.plugin, "zookeeper", sizeof (vl.plugin));
90         sstrncpy (vl.type, type, sizeof (vl.type));
91         if (type_inst != NULL)
92                 sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
93
94         plugin_dispatch_values (&vl);
95 } /* zookeeper_submit_derive */
96
97 static int zookeeper_connect (void)
98 {
99         int sk = -1;
100         int status;
101         struct addrinfo *ai_list;
102         const char *host;
103         const char *port;
104
105         host = (zk_host != NULL) ? zk_host : ZOOKEEPER_DEF_HOST;
106         port = (zk_port != NULL) ? zk_port : ZOOKEEPER_DEF_PORT;
107
108         struct addrinfo ai_hints = {
109                 .ai_family   = AF_UNSPEC,
110                 .ai_socktype = SOCK_STREAM
111         };
112
113         status = getaddrinfo (host, port, &ai_hints, &ai_list);
114         if (status != 0)
115         {
116                 char errbuf[1024];
117                 INFO ("getaddrinfo failed: %s",
118                           (status == EAI_SYSTEM)
119                           ? sstrerror (errno, errbuf, sizeof (errbuf))
120                           : gai_strerror (status));
121                 return (-1);
122         }
123
124         for (struct addrinfo *ai = ai_list; ai != NULL; ai = ai->ai_next)
125         {
126                 sk = socket (ai->ai_family, SOCK_STREAM, 0);
127                 if (sk < 0)
128                 {
129                         char errbuf[1024];
130                         WARNING ("zookeeper: socket(2) failed: %s",
131                                          sstrerror (errno, errbuf, sizeof(errbuf)));
132                         continue;
133                 }
134                 status = (int) connect (sk, ai->ai_addr, ai->ai_addrlen);
135                 if (status != 0)
136                 {
137                         char errbuf[1024];
138                         close (sk);
139                         sk = -1;
140                         WARNING ("zookeeper: connect(2) failed: %s",
141                                          sstrerror (errno, errbuf, sizeof(errbuf)));
142                         continue;
143                 }
144
145                 /* connected */
146                 break;
147         }
148
149         freeaddrinfo(ai_list);
150         return (sk);
151 } /* int zookeeper_connect */
152
153 static int zookeeper_query (char *buffer, size_t buffer_size)
154 {
155         int sk, status;
156         size_t buffer_fill;
157
158         sk = zookeeper_connect();
159         if (sk < 0)
160         {
161                 ERROR ("zookeeper: Could not connect to daemon");
162                 return (-1);
163         }
164
165         status = (int) swrite (sk, "mntr\r\n", strlen("mntr\r\n"));
166         if (status != 0)
167         {
168                 char errbuf[1024];
169                 ERROR ("zookeeper: write(2) failed: %s",
170                            sstrerror (errno, errbuf, sizeof (errbuf)));
171                 close (sk);
172                 return (-1);
173         }
174
175         memset (buffer, 0, buffer_size);
176         buffer_fill = 0;
177
178         while ((status = (int) recv (sk, buffer + buffer_fill,
179           buffer_size - buffer_fill, /* flags = */ 0)) != 0)
180         {
181                 if (status < 0)
182                 {
183                         char errbuf[1024];
184                         if ((errno == EAGAIN) || (errno == EINTR))
185                                 continue;
186                         ERROR ("zookeeper: Error reading from socket: %s",
187                                    sstrerror (errno, errbuf, sizeof (errbuf)));
188                         close (sk);
189                         return (-1);
190                 }
191
192                 buffer_fill += (size_t) status;
193         } /* while (recv) */
194
195         status = 0;
196         if (buffer_fill == 0)
197         {
198                 WARNING ("zookeeper: No data returned by MNTR command.");
199                 status = -1;
200         }
201
202         close(sk);
203         return (status);
204 } /* int zookeeper_query */
205
206
207 static int zookeeper_read (void) {
208         char buf[4096];
209         char *ptr;
210         char *save_ptr;
211         char *line;
212         char *fields[2];
213
214         if (zookeeper_query (buf, sizeof (buf)) < 0)
215         {
216                 return (-1);
217         }
218
219         ptr = buf;
220         save_ptr = NULL;
221         while ((line = strtok_r (ptr, "\n\r", &save_ptr)) != NULL)
222         {
223                 ptr = NULL;
224                 if (strsplit(line, fields, 2) != 2)
225                 {
226                         continue;
227                 }
228 #define FIELD_CHECK(check, expected) \
229         (strncmp (check, expected, strlen(expected)) == 0)
230
231                 if (FIELD_CHECK (fields[0], "zk_avg_latency"))
232                 {
233                         zookeeper_submit_gauge ("latency", "avg", atol(fields[1]));
234                 }
235                 else if (FIELD_CHECK(fields[0], "zk_min_latency"))
236                 {
237                         zookeeper_submit_gauge ("latency", "min", atol(fields[1]));
238                 }
239                 else if (FIELD_CHECK (fields[0], "zk_max_latency"))
240                 {
241                         zookeeper_submit_gauge ("latency", "max", atol(fields[1]));
242                 }
243                 else if (FIELD_CHECK (fields[0], "zk_packets_received"))
244                 {
245                         zookeeper_submit_derive ("packets", "received", atol(fields[1]));
246                 }
247                 else if (FIELD_CHECK (fields[0], "zk_packets_sent"))
248                 {
249                         zookeeper_submit_derive ("packets", "sent", atol(fields[1]));
250                 }
251                 else if (FIELD_CHECK (fields[0], "zk_num_alive_connections"))
252                 {
253                         zookeeper_submit_gauge ("current_connections", NULL, atol(fields[1]));
254                 }
255                 else if (FIELD_CHECK (fields[0], "zk_outstanding_requests"))
256                 {
257                         zookeeper_submit_gauge ("requests", "outstanding", atol(fields[1]));
258                 }
259                 else if (FIELD_CHECK (fields[0], "zk_znode_count"))
260                 {
261                         zookeeper_submit_gauge ("gauge", "znode", atol(fields[1]));
262                 }
263                 else if (FIELD_CHECK (fields[0], "zk_watch_count"))
264                 {
265                         zookeeper_submit_gauge ("gauge", "watch", atol(fields[1]));
266                 }
267                 else if (FIELD_CHECK (fields[0], "zk_ephemerals_count"))
268                 {
269                         zookeeper_submit_gauge ("gauge", "ephemerals", atol(fields[1]));
270                 }
271                 else if (FIELD_CHECK (fields[0], "zk_ephemerals_count"))
272                 {
273                         zookeeper_submit_gauge ("gauge", "ephemerals", atol(fields[1]));
274                 }
275                 else if (FIELD_CHECK (fields[0], "zk_ephemerals_count"))
276                 {
277                         zookeeper_submit_gauge ("gauge", "ephemerals", atol(fields[1]));
278                 }
279                 else if (FIELD_CHECK (fields[0], "zk_approximate_data_size"))
280                 {
281                         zookeeper_submit_gauge ("bytes", "approximate_data_size", atol(fields[1]));
282                 }
283                 else if (FIELD_CHECK (fields[0], "zk_followers"))
284                 {
285                         zookeeper_submit_gauge ("count", "followers", atol(fields[1]));
286                 }
287                 else if (FIELD_CHECK (fields[0], "zk_synced_followers"))
288                 {
289                         zookeeper_submit_gauge ("count", "synced_followers", atol(fields[1]));
290                 }
291                 else if (FIELD_CHECK (fields[0], "zk_pending_syncs"))
292                 {
293                         zookeeper_submit_gauge ("count", "pending_syncs", atol(fields[1]));
294                 }
295                 else
296                 {
297                         DEBUG("Uncollected zookeeper MNTR field %s", fields[0]);
298                 }
299         }
300
301         return (0);
302 } /* zookeeper_read */
303
304 void module_register (void)
305 {
306         plugin_register_config ("zookeeper", zookeeper_config, config_keys, config_keys_num);
307         plugin_register_read ("zookeeper", zookeeper_read);
308 } /* void module_register */