connectivity plugin initial commit
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <net/if.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <libmnl/libmnl.h>
45 #include <linux/netlink.h>
46 #include <linux/rtnetlink.h>
47
48 #define MYPROTO NETLINK_ROUTE
49
50 /*
51  * Private data types
52  */
53 struct interfacelist_s {
54   char *interface;
55
56   uint32_t status;
57   uint32_t prev_status;
58   uint32_t sent;
59   uint32_t sec;
60   uint32_t usec;
61
62   struct interfacelist_s *next;
63 };
64 typedef struct interfacelist_s interfacelist_t;
65
66 /*
67  * Private variables
68  */
69 static interfacelist_t *interfacelist_head = NULL;
70
71 static int connectivity_thread_loop = 0;
72 static int connectivity_thread_error = 0;
73 static pthread_t connectivity_thread_id;
74 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
75 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
76 static struct mnl_socket *sock;
77
78 static const char *config_keys[] = {"Interface"};
79 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
80
81 /*
82  * Private functions
83  */
84
85 static int connectivity_link_state(struct nlmsghdr *msg) {
86   int retval = 0;
87   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
88   struct nlattr *attr;
89   const char *dev = NULL;
90
91   pthread_mutex_lock(&connectivity_lock);
92
93   interfacelist_t *il;
94
95   /* Scan attribute list for device name. */
96   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
97     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
98       continue;
99
100     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
101       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
102             "mnl_attr_validate "
103             "failed.");
104       pthread_mutex_unlock(&connectivity_lock);
105       return MNL_CB_ERROR;
106     }
107
108     dev = mnl_attr_get_str(attr);
109
110     for (il = interfacelist_head; il != NULL; il = il->next)
111       if (strcmp(dev, il->interface) == 0)
112         break;
113
114     if (il == NULL) {
115       INFO("connectivity plugin: Ignoring link state change for unmonitored "
116            "interface: %s",
117            dev);
118     } else {
119       uint32_t prev_status;
120       struct timeval tv;
121
122       gettimeofday(&tv, NULL);
123
124       unsigned long long millisecondsSinceEpoch =
125           (unsigned long long)(tv.tv_sec) * 1000 +
126           (unsigned long long)(tv.tv_usec) / 1000;
127
128       INFO("connectivity plugin (%llu): Interface %s status is now %s",
129            millisecondsSinceEpoch, dev,
130            ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
131       prev_status = il->status;
132       il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0);
133       il->sec = tv.tv_sec;
134       il->usec = tv.tv_usec;
135       // If the new status is different than the previous status,
136       // store the previous status and set sent to zero
137       if (il->status != prev_status) {
138         il->prev_status = prev_status;
139         il->sent = 0;
140       }
141     }
142
143     // no need to loop again, we found the interface name
144     // (otherwise the first if-statement in the loop would
145     // have moved us on with 'continue')
146     break;
147   }
148
149   pthread_mutex_unlock(&connectivity_lock);
150
151   return retval;
152 }
153
154 static int msg_handler(struct nlmsghdr *msg) {
155   switch (msg->nlmsg_type) {
156   case RTM_NEWADDR:
157     break;
158   case RTM_DELADDR:
159     break;
160   case RTM_NEWROUTE:
161     break;
162   case RTM_DELROUTE:
163     break;
164   case RTM_NEWLINK:
165     connectivity_link_state(msg);
166     break;
167   case RTM_DELLINK:
168     break;
169   default:
170     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
171           msg->nlmsg_type);
172     break;
173   }
174   return 0;
175 }
176
177 static int read_event(struct mnl_socket *nl,
178                       int (*msg_handler)(struct nlmsghdr *)) {
179   int status;
180   int ret = 0;
181   char buf[4096];
182   struct nlmsghdr *h;
183
184   if (nl == NULL)
185     return ret;
186
187   status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
188
189   if (status < 0) {
190     /* Socket non-blocking so bail out once we have read everything */
191     if (errno == EWOULDBLOCK || errno == EAGAIN)
192       return ret;
193
194     /* Anything else is an error */
195     ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
196           status);
197     return status;
198   }
199
200   if (status == 0) {
201     DEBUG("connectivity plugin: read_event: EOF\n");
202   }
203
204   /* We need to handle more than one message per 'recvmsg' */
205   for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
206        h = NLMSG_NEXT(h, status)) {
207     /* Finish reading */
208     if (h->nlmsg_type == NLMSG_DONE)
209       return ret;
210
211     /* Message is some kind of error */
212     if (h->nlmsg_type == NLMSG_ERROR) {
213       ERROR("connectivity plugin: read_event: Message is an error - decode "
214             "TBD\n");
215       return -1; // Error
216     }
217
218     /* Call message handler */
219     if (msg_handler) {
220       ret = (*msg_handler)(h);
221       if (ret < 0) {
222         ERROR("connectivity plugin: read_event: Message handler error %d\n",
223               ret);
224         return ret;
225       }
226     } else {
227       ERROR("connectivity plugin: read_event: Error NULL message handler\n");
228       return -1;
229     }
230   }
231
232   return ret;
233 }
234
235 static void *connectivity_thread(void *arg) /* {{{ */
236 {
237   pthread_mutex_lock(&connectivity_lock);
238
239   while (connectivity_thread_loop > 0) {
240     int status;
241
242     pthread_mutex_unlock(&connectivity_lock);
243
244     status = read_event(sock, msg_handler);
245
246     pthread_mutex_lock(&connectivity_lock);
247
248     if (status < 0) {
249       connectivity_thread_error = 1;
250       break;
251     }
252
253     if (connectivity_thread_loop <= 0)
254       break;
255   } /* while (connectivity_thread_loop > 0) */
256
257   pthread_mutex_unlock(&connectivity_lock);
258
259   return ((void *)0);
260 } /* }}} void *connectivity_thread */
261
262 static int start_thread(void) /* {{{ */
263 {
264   int status;
265
266   pthread_mutex_lock(&connectivity_lock);
267
268   if (connectivity_thread_loop != 0) {
269     pthread_mutex_unlock(&connectivity_lock);
270     return (0);
271   }
272
273   connectivity_thread_loop = 1;
274   connectivity_thread_error = 0;
275
276   if (sock == NULL) {
277     sock = mnl_socket_open(NETLINK_ROUTE);
278     if (sock == NULL) {
279       ERROR(
280           "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
281       pthread_mutex_unlock(&connectivity_lock);
282       return (-1);
283     }
284
285     if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
286       ERROR(
287           "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
288       pthread_mutex_unlock(&connectivity_lock);
289       return (1);
290     }
291   }
292
293   status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
294                                 connectivity_thread,
295                                 /* arg = */ (void *)0, "connectivity");
296   if (status != 0) {
297     connectivity_thread_loop = 0;
298     ERROR("connectivity plugin: Starting thread failed.");
299     pthread_mutex_unlock(&connectivity_lock);
300     mnl_socket_close(sock);
301     return (-1);
302   }
303
304   pthread_mutex_unlock(&connectivity_lock);
305   return (0);
306 } /* }}} int start_thread */
307
308 static int stop_thread(int shutdown) /* {{{ */
309 {
310   int status;
311
312   if (sock != NULL)
313     mnl_socket_close(sock);
314
315   pthread_mutex_lock(&connectivity_lock);
316
317   if (connectivity_thread_loop == 0) {
318     pthread_mutex_unlock(&connectivity_lock);
319     return (-1);
320   }
321
322   connectivity_thread_loop = 0;
323   pthread_cond_broadcast(&connectivity_cond);
324   pthread_mutex_unlock(&connectivity_lock);
325
326   if (shutdown == 1) {
327     // Since the thread is blocking, calling pthread_join
328     // doesn't actually succeed in stopping it.  It will stick around
329     // until a NETLINK message is received on the socket (at which
330     // it will realize that "connectivity_thread_loop" is 0 and will
331     // break out of the read loop and be allowed to die).  This is
332     // fine when the process isn't supposed to be exiting, but in
333     // the case of a process shutdown, we don't want to have an
334     // idle thread hanging around.  Calling pthread_cancel here in
335     // the case of a shutdown is just assures that the thread is
336     // gone and that the process has been fully terminated.
337
338     INFO("connectivity plugin: Canceling thread for process shutdown");
339
340     status = pthread_cancel(connectivity_thread_id);
341
342     if (status != 0) {
343       ERROR("connectivity plugin: Unable to cancel thread: %d", status);
344       status = -1;
345     }
346   } else {
347     status = pthread_join(connectivity_thread_id, /* return = */ NULL);
348     if (status != 0) {
349       ERROR("connectivity plugin: Stopping thread failed.");
350       status = -1;
351     }
352   }
353
354   pthread_mutex_lock(&connectivity_lock);
355   memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
356   connectivity_thread_error = 0;
357   pthread_mutex_unlock(&connectivity_lock);
358
359   INFO("connectivity plugin: Finished requesting stop of thread");
360
361   return (status);
362 } /* }}} int stop_thread */
363
364 static int connectivity_init(void) /* {{{ */
365 {
366   if (interfacelist_head == NULL) {
367     NOTICE("connectivity plugin: No interfaces have been configured.");
368     return (-1);
369   }
370
371   return (start_thread());
372 } /* }}} int connectivity_init */
373
374 static int connectivity_config(const char *key, const char *value) /* {{{ */
375 {
376   if (strcasecmp(key, "Interface") == 0) {
377     interfacelist_t *il;
378     char *interface;
379
380     il = malloc(sizeof(*il));
381     if (il == NULL) {
382       char errbuf[1024];
383       ERROR("connectivity plugin: malloc failed during connectivity_config: %s",
384             sstrerror(errno, errbuf, sizeof(errbuf)));
385       return (1);
386     }
387
388     interface = strdup(value);
389     if (interface == NULL) {
390       char errbuf[1024];
391       sfree(il);
392       ERROR("connectivity plugin: strdup failed connectivity_config: %s",
393             sstrerror(errno, errbuf, sizeof(errbuf)));
394       return (1);
395     }
396
397     il->interface = interface;
398     il->status = 2; // "unknown"
399     il->prev_status = 2;
400     il->sent = 0;
401     il->next = interfacelist_head;
402     interfacelist_head = il;
403
404   } else {
405     return (-1);
406   }
407
408   return (0);
409 } /* }}} int connectivity_config */
410
411 static void submit(const char *interface, const char *type, /* {{{ */
412                    gauge_t value, uint32_t sec, uint32_t usec) {
413   value_list_t vl = VALUE_LIST_INIT;
414   char hostname[1024];
415   vl.values = &(value_t){.gauge = value};
416   vl.values_len = 1;
417   sstrncpy(vl.plugin, "connectivity", sizeof(vl.plugin));
418   sstrncpy(vl.type_instance, interface, sizeof(vl.type_instance));
419   sstrncpy(vl.type, type, sizeof(vl.type));
420
421   // Create metadata to store JSON key-values
422   meta_data_t *meta = meta_data_create();
423
424   vl.meta = meta;
425   // For latency measurement
426   struct timeval tv;
427   gettimeofday(&tv, NULL);
428   gethostname(hostname, sizeof(hostname));
429   char strSec[11];
430   char struSec[11];
431   snprintf(strSec, sizeof strSec, "%" PRIu32, sec);
432   snprintf(struSec, sizeof struSec, "%" PRIu32, usec);
433   if (value == 1) {
434     meta_data_add_string(meta, "condition", "interface_up");
435     meta_data_add_string(meta, "entity", interface);
436     meta_data_add_string(meta, "source", hostname);
437     meta_data_add_string(meta, "sec", strSec);
438     meta_data_add_string(meta, "usec", struSec);
439     meta_data_add_string(meta, "dest", "interface_down");
440   } else {
441     meta_data_add_string(meta, "condition", "interface_down");
442     meta_data_add_string(meta, "entity", interface);
443     meta_data_add_string(meta, "source", hostname);
444     meta_data_add_string(meta, "sec", strSec);
445     meta_data_add_string(meta, "usec", struSec);
446     meta_data_add_string(meta, "dest", "interface_up");
447   }
448
449   plugin_dispatch_values(&vl);
450 } /* }}} void interface_submit */
451
452 static int connectivity_read(void) /* {{{ */
453 {
454   if (connectivity_thread_error != 0) {
455     ERROR("connectivity plugin: The interface thread had a problem. Restarting "
456           "it.");
457
458     stop_thread(0);
459
460     for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) {
461       il->status = 2; // signifies "unknown"
462       il->prev_status = 2;
463       il->sent = 0;
464     }
465
466     start_thread();
467
468     return (-1);
469   } /* if (connectivity_thread_error != 0) */
470
471   for (interfacelist_t *il = interfacelist_head; il != NULL;
472        il = il->next) /* {{{ */
473   {
474     uint32_t status;
475     uint32_t prev_status;
476     uint32_t sent;
477
478     /* Locking here works, because the structure of the linked list is only
479      * changed during configure and shutdown. */
480     pthread_mutex_lock(&connectivity_lock);
481
482     status = il->status;
483     prev_status = il->prev_status;
484     sent = il->sent;
485
486     if (status != prev_status && sent == 0) {
487       submit(il->interface, "gauge", status, il->sec, il->usec);
488
489       il->sent = 1;
490     }
491
492     pthread_mutex_unlock(&connectivity_lock);
493   } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */
494
495   return (0);
496 } /* }}} int connectivity_read */
497
498 static int connectivity_shutdown(void) /* {{{ */
499 {
500   interfacelist_t *il;
501
502   INFO("connectivity plugin: Shutting down thread.");
503   if (stop_thread(1) < 0)
504     return (-1);
505
506   il = interfacelist_head;
507   while (il != NULL) {
508     interfacelist_t *il_next;
509
510     il_next = il->next;
511
512     sfree(il->interface);
513     sfree(il);
514
515     il = il_next;
516   }
517
518   return (0);
519 } /* }}} int connectivity_shutdown */
520
521 void module_register(void) {
522   plugin_register_config("connectivity", connectivity_config, config_keys,
523                          config_keys_num);
524   plugin_register_init("connectivity", connectivity_init);
525   plugin_register_read("connectivity", connectivity_read);
526   plugin_register_shutdown("connectivity", connectivity_shutdown);
527 } /* void module_register */