d88f6d95edbbbecc5c46f6189f6168244ee7d64a
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33 #include "utils_ignorelist.h"
34
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57
58 #define MYPROTO NETLINK_ROUTE
59
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
90   "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93
94 /*
95  * Private data types
96  */
97
98 struct interface_list_s {
99   char *interface;
100
101   uint32_t status;
102   uint32_t prev_status;
103   uint32_t sent;
104   cdtime_t timestamp;
105
106   struct interface_list_s *next;
107 };
108 typedef struct interface_list_s interface_list_t;
109
110 /*
111  * Private variables
112  */
113
114 static ignorelist_t *ignorelist = NULL;
115
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
118
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static pthread_t connectivity_dequeue_thread_id;
124 static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
125 static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
127 static int nl_sock = -1;
128 static int event_id = 0;
129 static int statuses_to_send = 0;
130
131 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
132 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
133
134 /*
135  * Private functions
136  */
137
138 static int gen_message_payload(int state, int old_state, const char *interface,
139                                cdtime_t timestamp, char **buf) {
140   const unsigned char *buf2;
141   yajl_gen g;
142   char json_str[DATA_MAX_NAME_LEN];
143
144 #if !defined(HAVE_YAJL_V2)
145   yajl_gen_config conf = {0};
146 #endif
147
148 #if HAVE_YAJL_V2
149   size_t len;
150   g = yajl_gen_alloc(NULL);
151   yajl_gen_config(g, yajl_gen_beautify, 0);
152 #else
153   unsigned int len;
154   g = yajl_gen_alloc(&conf, NULL);
155 #endif
156
157   yajl_gen_clear(g);
158
159   // *** BEGIN common event header ***
160
161   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
162     goto err;
163
164   // domain
165   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
166                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
167     goto err;
168
169   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
170                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
171     goto err;
172
173   // eventId
174   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
175                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
176       yajl_gen_status_ok)
177     goto err;
178
179   event_id = event_id + 1;
180   if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
181     goto err;
182   }
183
184   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
185     goto err;
186   }
187
188   // eventName
189   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
190                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
191       yajl_gen_status_ok)
192     goto err;
193
194   if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
195                (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
196                            : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
197     goto err;
198   }
199
200   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
201       yajl_gen_status_ok) {
202     goto err;
203   }
204
205   // lastEpochMicrosec
206   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
207                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
208       yajl_gen_status_ok)
209     goto err;
210
211   if (snprintf(json_str, sizeof(json_str), "%lu", CDTIME_T_TO_US(cdtime())) <
212       0) {
213     goto err;
214   }
215
216   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
217     goto err;
218   }
219
220   // priority
221   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
222                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
223       yajl_gen_status_ok)
224     goto err;
225
226   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
227                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
228       yajl_gen_status_ok)
229     goto err;
230
231   // reportingEntityName
232   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
233                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
234       yajl_gen_status_ok)
235     goto err;
236
237   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
238                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
239       yajl_gen_status_ok)
240     goto err;
241
242   // sequence
243   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
244                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
245       yajl_gen_status_ok)
246     goto err;
247
248   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
249                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
250       yajl_gen_status_ok)
251     goto err;
252
253   // sourceName
254   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
255                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
256       yajl_gen_status_ok)
257     goto err;
258
259   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
260       yajl_gen_status_ok)
261     goto err;
262
263   // startEpochMicrosec
264   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
265                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
266       yajl_gen_status_ok)
267     goto err;
268
269   if (snprintf(json_str, sizeof(json_str), "%lu", timestamp) < 0) {
270     goto err;
271   }
272
273   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
274     goto err;
275   }
276
277   // version
278   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
279                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
280     goto err;
281
282   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
283                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
284     goto err;
285
286   // *** END common event header ***
287
288   // *** BEGIN state change fields ***
289
290   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
291                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
292       yajl_gen_status_ok)
293     goto err;
294
295   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
296     goto err;
297
298   // newState
299   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
300                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
301       yajl_gen_status_ok)
302     goto err;
303
304   int new_state_len =
305       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
306                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
307
308   if (yajl_gen_string(
309           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
310                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
311           new_state_len) != yajl_gen_status_ok)
312     goto err;
313
314   // oldState
315   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
316                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
317       yajl_gen_status_ok)
318     goto err;
319
320   int old_state_len =
321       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
322                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
323
324   if (yajl_gen_string(
325           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
326                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
327           old_state_len) != yajl_gen_status_ok)
328     goto err;
329
330   // stateChangeFieldsVersion
331   if (yajl_gen_string(g,
332                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
333                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
334       yajl_gen_status_ok)
335     goto err;
336
337   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
338                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
339       yajl_gen_status_ok)
340     goto err;
341
342   // stateInterface
343   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
344                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
345       yajl_gen_status_ok)
346     goto err;
347
348   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
349       yajl_gen_status_ok)
350     goto err;
351
352   // close state change and header fields
353   if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
354       yajl_gen_map_close(g) != yajl_gen_status_ok)
355     goto err;
356
357   // *** END state change fields ***
358
359   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
360     goto err;
361
362   *buf = strdup((char *)buf2);
363
364   if (*buf == NULL) {
365     ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
366           STRERRNO);
367     goto err;
368   }
369
370   yajl_gen_free(g);
371
372   return 0;
373
374 err:
375   yajl_gen_free(g);
376   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
377   return -1;
378 }
379
380 static interface_list_t *add_interface(const char *interface, int status,
381                                        int prev_status) {
382   interface_list_t *il = calloc(1, sizeof(*il));
383
384   if (il == NULL) {
385     ERROR("connectivity plugin: calloc failed during add_interface: %s",
386           STRERRNO);
387     return NULL;
388   }
389
390   char *interface2 = strdup(interface);
391   if (interface2 == NULL) {
392     sfree(il);
393     ERROR("connectivity plugin: strdup failed during add_interface: %s",
394           STRERRNO);
395     return NULL;
396   }
397
398   il->interface = interface2;
399   il->status = status;
400   il->prev_status = prev_status;
401   il->timestamp = CDTIME_T_TO_US(cdtime());
402   il->sent = 0;
403   il->next = interface_list_head;
404   interface_list_head = il;
405
406   DEBUG("connectivity plugin: added interface %s", interface2);
407
408   return il;
409 }
410
411 static int connectivity_link_state(struct nlmsghdr *msg) {
412   pthread_mutex_lock(&connectivity_data_lock);
413
414   struct nlattr *attr;
415   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
416
417   /* Scan attribute list for device name. */
418   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
419     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
420       continue;
421
422     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
423       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
424             "mnl_attr_validate "
425             "failed.");
426       pthread_mutex_unlock(&connectivity_data_lock);
427       return MNL_CB_ERROR;
428     }
429
430     const char *dev = mnl_attr_get_str(attr);
431
432     // Check the list of interfaces we should monitor, if we've chosen
433     // a subset.  If we don't care about this one, abort.
434     if (ignorelist_match(ignorelist, dev) != 0) {
435       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
436             "interface: %s",
437             dev);
438       break;
439     }
440
441     interface_list_t *il = NULL;
442
443     for (il = interface_list_head; il != NULL; il = il->next)
444       if (strcmp(dev, il->interface) == 0)
445         break;
446
447     if (il == NULL) {
448       // We haven't encountered this interface yet, so add it to the linked list
449       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
450
451       if (il == NULL) {
452         ERROR("connectivity plugin: unable to add interface %s during "
453               "connectivity_link_state",
454               dev);
455         return MNL_CB_ERROR;
456       }
457     }
458
459     uint32_t prev_status;
460
461     prev_status = il->status;
462     il->status =
463         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
464     il->timestamp = CDTIME_T_TO_US(cdtime());
465
466     // If the new status is different than the previous status,
467     // store the previous status and set sent to zero, and set the
468     // global flag to indicate there are statuses to dispatch
469     if (il->status != prev_status) {
470       il->prev_status = prev_status;
471       il->sent = 0;
472       statuses_to_send = 1;
473     }
474
475     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
476           il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
477
478     // no need to loop again, we found the interface name attr
479     // (otherwise the first if-statement in the loop would
480     // have moved us on with 'continue')
481     break;
482   }
483
484   pthread_mutex_unlock(&connectivity_data_lock);
485
486   return 0;
487 }
488
489 static int msg_handler(struct nlmsghdr *msg) {
490   switch (msg->nlmsg_type) {
491   case RTM_NEWADDR:
492   case RTM_DELADDR:
493   case RTM_NEWROUTE:
494   case RTM_DELROUTE:
495   case RTM_DELLINK:
496     // Not of interest in current version
497     break;
498   case RTM_NEWLINK:
499     connectivity_link_state(msg);
500     break;
501   default:
502     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d",
503           msg->nlmsg_type);
504     break;
505   }
506   return 0;
507 }
508
509 static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
510   int ret = 0;
511   int recv_flags = MSG_DONTWAIT;
512
513   if (nl == -1)
514     return ret;
515
516   while (42) {
517     pthread_mutex_lock(&connectivity_threads_lock);
518
519     if (connectivity_netlink_thread_loop <= 0) {
520       pthread_mutex_unlock(&connectivity_threads_lock);
521       return ret;
522     }
523
524     pthread_mutex_unlock(&connectivity_threads_lock);
525
526     char buf[4096];
527     int status = recv(nl, buf, sizeof(buf), recv_flags);
528
529     if (status < 0) {
530
531       // If there were no more messages to drain from the socket,
532       // then signal the dequeue thread and allow it to dispatch
533       // any saved interface status changes.  Then continue, but
534       // block and wait for new messages
535       if (errno == EWOULDBLOCK || errno == EAGAIN) {
536         pthread_cond_signal(&connectivity_cond);
537
538         recv_flags = 0;
539         continue;
540       }
541
542       if (errno == EINTR) {
543         // Interrupt, so just continue and try again
544         continue;
545       }
546
547       /* Anything else is an error */
548       ERROR("connectivity plugin: read_event: Error recv: %d", status);
549       return status;
550     }
551
552     // Message received successfully, so we'll stop blocking on the
553     // receive call for now (until we get a "would block" error, which
554     // will be handled above)
555     recv_flags = MSG_DONTWAIT;
556
557     if (status == 0) {
558       DEBUG("connectivity plugin: read_event: EOF");
559     }
560
561     /* We need to handle more than one message per 'recvmsg' */
562     for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
563          NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
564       /* Finish reading */
565       if (h->nlmsg_type == NLMSG_DONE)
566         return ret;
567
568       /* Message is some kind of error */
569       if (h->nlmsg_type == NLMSG_ERROR) {
570         ERROR("connectivity plugin: read_event: Message is an error");
571         return -1; // Error
572       }
573
574       /* Call message handler */
575       if (msg_handler) {
576         ret = (*msg_handler)(h);
577         if (ret < 0) {
578           ERROR("connectivity plugin: read_event: Message handler error %d",
579                 ret);
580           return ret;
581         }
582       } else {
583         ERROR("connectivity plugin: read_event: Error NULL message handler");
584         return -1;
585       }
586     }
587   }
588
589   return ret;
590 }
591
592 static void connectivity_dispatch_notification(const char *interface,
593                                                const char *type, gauge_t value,
594                                                gauge_t old_value,
595                                                cdtime_t timestamp) {
596
597   notification_t n = {(value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
598                       cdtime(),
599                       "",
600                       "",
601                       "connectivity",
602                       "",
603                       "",
604                       "",
605                       NULL};
606
607   sstrncpy(n.host, hostname_g, sizeof(n.host));
608   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
609   sstrncpy(n.type, "gauge", sizeof(n.type));
610   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
611
612   char *buf = NULL;
613
614   gen_message_payload(value, old_value, interface, timestamp, &buf);
615
616   notification_meta_t *m = calloc(1, sizeof(*m));
617
618   if (m == NULL) {
619     sfree(buf);
620     ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO);
621     return;
622   }
623
624   sstrncpy(m->name, "ves", sizeof(m->name));
625   m->nm_value.nm_string = sstrdup(buf);
626   m->type = NM_TYPE_STRING;
627   n.meta = m;
628
629   DEBUG("connectivity plugin: notification message: %s",
630         n.meta->nm_value.nm_string);
631
632   DEBUG("connectivity plugin: dispatching state %d for interface %s",
633         (int)value, interface);
634
635   plugin_dispatch_notification(&n);
636   plugin_notification_meta_free(n.meta);
637
638   // strdup'd in gen_message_payload
639   if (buf != NULL)
640     sfree(buf);
641 }
642
643 // NOTE: Caller MUST hold connectivity_data_lock when calling this function
644 static void send_interface_status() {
645   for (interface_list_t *il = interface_list_head; il != NULL;
646        il = il->next) /* {{{ */
647   {
648     uint32_t status = il->status;
649     uint32_t prev_status = il->prev_status;
650     uint32_t sent = il->sent;
651
652     if (status != prev_status && sent == 0) {
653       connectivity_dispatch_notification(il->interface, "gauge", status,
654                                          prev_status, il->timestamp);
655       il->sent = 1;
656     }
657   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
658
659   statuses_to_send = 0;
660 }
661
662 static void read_interface_status() /* {{{ */
663 {
664   pthread_mutex_lock(&connectivity_data_lock);
665
666   // If we don't have any interface statuses to dispatch,
667   // then we wait until signalled
668   if (!statuses_to_send)
669     pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
670
671   send_interface_status();
672
673   pthread_mutex_unlock(&connectivity_data_lock);
674 } /* }}} int *read_interface_status */
675
676 static void *connectivity_netlink_thread(void *arg) /* {{{ */
677 {
678   pthread_mutex_lock(&connectivity_threads_lock);
679
680   while (connectivity_netlink_thread_loop > 0) {
681     pthread_mutex_unlock(&connectivity_threads_lock);
682
683     int status = read_event(nl_sock, msg_handler);
684
685     pthread_mutex_lock(&connectivity_threads_lock);
686
687     if (status < 0) {
688       connectivity_netlink_thread_error = 1;
689       break;
690     }
691   } /* while (connectivity_netlink_thread_loop > 0) */
692
693   pthread_mutex_unlock(&connectivity_threads_lock);
694
695   return (void *)0;
696 } /* }}} void *connectivity_netlink_thread */
697
698 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
699 {
700   pthread_mutex_lock(&connectivity_threads_lock);
701
702   while (connectivity_dequeue_thread_loop > 0) {
703     pthread_mutex_unlock(&connectivity_threads_lock);
704
705     read_interface_status();
706
707     pthread_mutex_lock(&connectivity_threads_lock);
708   } /* while (connectivity_dequeue_thread_loop > 0) */
709
710   pthread_mutex_unlock(&connectivity_threads_lock);
711
712   return ((void *)0);
713 } /* }}} void *connectivity_dequeue_thread */
714
715 static int nl_connect() {
716   struct sockaddr_nl sa_nl = {
717       .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
718   };
719
720   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
721   if (nl_sock == -1) {
722     ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
723     return -1;
724   }
725
726   int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
727   if (rc == -1) {
728     ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
729     close(nl_sock);
730     return -1;
731   }
732
733   return 0;
734 }
735
736 static int start_netlink_thread(void) /* {{{ */
737 {
738   pthread_mutex_lock(&connectivity_threads_lock);
739
740   if (connectivity_netlink_thread_loop != 0) {
741     pthread_mutex_unlock(&connectivity_threads_lock);
742     return 0;
743   }
744
745   connectivity_netlink_thread_loop = 1;
746   connectivity_netlink_thread_error = 0;
747
748   int status;
749
750   if (nl_sock == -1) {
751     status = nl_connect();
752
753     if (status != 0) {
754       pthread_mutex_unlock(&connectivity_threads_lock);
755       return status;
756     }
757   }
758
759   status = plugin_thread_create(&connectivity_netlink_thread_id,
760                                 /* attr = */ NULL, connectivity_netlink_thread,
761                                 /* arg = */ (void *)0, "connectivity");
762   if (status != 0) {
763     connectivity_netlink_thread_loop = 0;
764     ERROR("connectivity plugin: Starting thread failed.");
765     pthread_mutex_unlock(&connectivity_threads_lock);
766
767     int status2 = close(nl_sock);
768
769     if (status2 != 0) {
770       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
771             status2, STRERRNO);
772     } else
773       nl_sock = -1;
774
775     return -1;
776   }
777
778   pthread_mutex_unlock(&connectivity_threads_lock);
779
780   return status;
781 }
782
783 static int start_dequeue_thread(void) /* {{{ */
784 {
785   pthread_mutex_lock(&connectivity_threads_lock);
786
787   if (connectivity_dequeue_thread_loop != 0) {
788     pthread_mutex_unlock(&connectivity_threads_lock);
789     return 0;
790   }
791
792   connectivity_dequeue_thread_loop = 1;
793
794   int status =
795       plugin_thread_create(&connectivity_dequeue_thread_id,
796                            /* attr = */ NULL, connectivity_dequeue_thread,
797                            /* arg = */ (void *)0, "connectivity");
798   if (status != 0) {
799     connectivity_dequeue_thread_loop = 0;
800     ERROR("connectivity plugin: Starting dequeue thread failed.");
801     pthread_mutex_unlock(&connectivity_threads_lock);
802     return -1;
803   }
804
805   pthread_mutex_unlock(&connectivity_threads_lock);
806
807   return status;
808 } /* }}} int start_dequeue_thread */
809
810 static int start_threads(void) /* {{{ */
811 {
812   int status = start_netlink_thread();
813   int status2 = start_dequeue_thread();
814
815   if (status != 0)
816     return status;
817   else
818     return status2;
819 } /* }}} int start_threads */
820
821 static int stop_netlink_thread(int shutdown) /* {{{ */
822 {
823   int socket_status;
824
825   if (nl_sock != -1) {
826     socket_status = close(nl_sock);
827     if (socket_status != 0) {
828       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
829             socket_status, STRERRNO);
830     } else
831       nl_sock = -1;
832   } else
833     socket_status = 0;
834
835   pthread_mutex_lock(&connectivity_threads_lock);
836
837   if (connectivity_netlink_thread_loop == 0) {
838     pthread_mutex_unlock(&connectivity_threads_lock);
839     // Thread has already been terminated, nothing more to attempt
840     return socket_status;
841   }
842
843   // Set thread termination status
844   connectivity_netlink_thread_loop = 0;
845   pthread_mutex_unlock(&connectivity_threads_lock);
846
847   // Let threads waiting on access to the interface list know to move
848   // on such that they'll see the thread's termination status
849   pthread_cond_broadcast(&connectivity_cond);
850
851   int thread_status;
852
853   if (shutdown == 1) {
854     // Since the thread is blocking, calling pthread_join
855     // doesn't actually succeed in stopping it.  It will stick around
856     // until a NETLINK message is received on the socket (at which
857     // it will realize that "connectivity_netlink_thread_loop" is 0 and will
858     // break out of the read loop and be allowed to die).  This is
859     // fine when the process isn't supposed to be exiting, but in
860     // the case of a process shutdown, we don't want to have an
861     // idle thread hanging around.  Calling pthread_cancel here in
862     // the case of a shutdown is just assures that the thread is
863     // gone and that the process has been fully terminated.
864
865     DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
866
867     thread_status = pthread_cancel(connectivity_netlink_thread_id);
868
869     if (thread_status != 0 && thread_status != ESRCH) {
870       ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
871             thread_status);
872       thread_status = -1;
873     } else
874       thread_status = 0;
875   } else {
876     thread_status =
877         pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
878     if (thread_status != 0 && thread_status != ESRCH) {
879       ERROR("connectivity plugin: Stopping netlink thread failed: %d",
880             thread_status);
881       thread_status = -1;
882     } else
883       thread_status = 0;
884   }
885
886   pthread_mutex_lock(&connectivity_threads_lock);
887   memset(&connectivity_netlink_thread_id, 0,
888          sizeof(connectivity_netlink_thread_id));
889   connectivity_netlink_thread_error = 0;
890   pthread_mutex_unlock(&connectivity_threads_lock);
891
892   DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
893
894   if (socket_status != 0)
895     return socket_status;
896   else
897     return thread_status;
898 }
899
900 static int stop_dequeue_thread(int shutdown) /* {{{ */
901 {
902   pthread_mutex_lock(&connectivity_threads_lock);
903
904   if (connectivity_dequeue_thread_loop == 0) {
905     pthread_mutex_unlock(&connectivity_threads_lock);
906     return -1;
907   }
908
909   // Set thread termination status
910   connectivity_dequeue_thread_loop = 0;
911   pthread_mutex_unlock(&connectivity_threads_lock);
912
913   // Let threads waiting on access to the interface list know to move
914   // on such that they'll see the threads termination status
915   pthread_cond_broadcast(&connectivity_cond);
916
917   int status;
918
919   if (shutdown == 1) {
920     // Calling pthread_cancel here in
921     // the case of a shutdown just assures that the thread is
922     // gone and that the process has been fully terminated.
923
924     DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
925
926     status = pthread_cancel(connectivity_dequeue_thread_id);
927
928     if (status != 0 && status != ESRCH) {
929       ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
930       status = -1;
931     } else
932       status = 0;
933   } else {
934     status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
935     if (status != 0 && status != ESRCH) {
936       ERROR("connectivity plugin: Stopping dequeue thread failed.");
937       status = -1;
938     } else
939       status = 0;
940   }
941
942   pthread_mutex_lock(&connectivity_threads_lock);
943   memset(&connectivity_dequeue_thread_id, 0,
944          sizeof(connectivity_dequeue_thread_id));
945   pthread_mutex_unlock(&connectivity_threads_lock);
946
947   DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
948
949   return status;
950 } /* }}} int stop_dequeue_thread */
951
952 static int stop_threads(int shutdown) /* {{{ */
953 {
954   int status = stop_netlink_thread(shutdown);
955   int status2 = stop_dequeue_thread(shutdown);
956
957   if (status != 0)
958     return status;
959   else
960     return status2;
961 } /* }}} int stop_threads */
962
963 static int connectivity_init(void) /* {{{ */
964 {
965   if (monitor_all_interfaces) {
966     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
967            "be monitored");
968   }
969
970   return start_threads();
971 } /* }}} int connectivity_init */
972
973 static int connectivity_config(const char *key, const char *value) /* {{{ */
974 {
975   if (ignorelist == NULL) {
976     ignorelist = ignorelist_create(/* invert = */ 1);
977
978     if (ignorelist == NULL)
979       return -1;
980   }
981
982   if (strcasecmp(key, "Interface") == 0) {
983     ignorelist_add(ignorelist, value);
984     monitor_all_interfaces = 0;
985   } else if (strcasecmp(key, "IgnoreSelected") == 0) {
986     int invert = 1;
987     if (IS_TRUE(value))
988       invert = 0;
989     ignorelist_set_invert(ignorelist, invert);
990   } else {
991     return -1;
992   }
993
994   return 0;
995 } /* }}} int connectivity_config */
996
997 static int connectivity_read(void) /* {{{ */
998 {
999   pthread_mutex_lock(&connectivity_threads_lock);
1000
1001   if (connectivity_netlink_thread_error != 0) {
1002
1003     pthread_mutex_unlock(&connectivity_threads_lock);
1004
1005     ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
1006           "it.");
1007
1008     stop_netlink_thread(0);
1009
1010     for (interface_list_t *il = interface_list_head; il != NULL;
1011          il = il->next) {
1012       il->status = LINK_STATE_UNKNOWN;
1013       il->prev_status = LINK_STATE_UNKNOWN;
1014       il->sent = 0;
1015     }
1016
1017     start_netlink_thread();
1018
1019     return -1;
1020   } /* if (connectivity_netlink_thread_error != 0) */
1021
1022   pthread_mutex_unlock(&connectivity_threads_lock);
1023
1024   return 0;
1025 } /* }}} int connectivity_read */
1026
1027 static int connectivity_shutdown(void) /* {{{ */
1028 {
1029   DEBUG("connectivity plugin: Shutting down thread.");
1030
1031   int status = stop_threads(1);
1032
1033   interface_list_t *il = interface_list_head;
1034   while (il != NULL) {
1035     interface_list_t *il_next;
1036
1037     il_next = il->next;
1038
1039     sfree(il->interface);
1040     sfree(il);
1041
1042     il = il_next;
1043   }
1044
1045   ignorelist_free(ignorelist);
1046
1047   return status;
1048 } /* }}} int connectivity_shutdown */
1049
1050 void module_register(void) {
1051   plugin_register_config("connectivity", connectivity_config, config_keys,
1052                          config_keys_num);
1053   plugin_register_init("connectivity", connectivity_init);
1054   plugin_register_read("connectivity", connectivity_read);
1055   plugin_register_shutdown("connectivity", connectivity_shutdown);
1056 } /* void module_register */