Properly detect which interfaces to monitor
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33 #include "utils_ignorelist.h"
34
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57
58 #define MYPROTO NETLINK_ROUTE
59
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
90   "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93
94 /*
95  * Private data types
96  */
97 struct interface_list_s {
98   char *interface;
99
100   uint32_t status;
101   uint32_t prev_status;
102   uint32_t sent;
103   long long unsigned int timestamp;
104
105   struct interface_list_s *next;
106 };
107 typedef struct interface_list_s interface_list_t;
108
109 /*
110  * Private variables
111  */
112 static ignorelist_t *ignorelist = NULL;
113
114 static interface_list_t *interface_list_head = NULL;
115 static int monitor_all_interfaces = 1;
116
117 static int connectivity_thread_loop = 0;
118 static int connectivity_thread_error = 0;
119 static pthread_t connectivity_thread_id;
120 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
121 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
122 static struct mnl_socket *sock;
123 static int event_id = 0;
124
125 static const char *config_keys[] = {"Interface"};
126 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
127
128 /*
129  * Private functions
130  */
131
132 static int gen_message_payload(int state, int old_state, const char *interface,
133                                long long unsigned int timestamp, char **buf) {
134   const unsigned char *buf2;
135   yajl_gen g;
136   char json_str[DATA_MAX_NAME_LEN];
137
138 #if !defined(HAVE_YAJL_V2)
139   yajl_gen_config conf = {};
140
141   conf.beautify = 0;
142 #endif
143
144 #if HAVE_YAJL_V2
145   size_t len;
146   g = yajl_gen_alloc(NULL);
147   yajl_gen_config(g, yajl_gen_beautify, 0);
148 #else
149   unsigned int len;
150   g = yajl_gen_alloc(&conf, NULL);
151 #endif
152
153   yajl_gen_clear(g);
154
155   // *** BEGIN common event header ***
156
157   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
158     goto err;
159
160   // domain
161   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
162                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
163     goto err;
164
165   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
166                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
167     goto err;
168
169   // eventId
170   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
171                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
172       yajl_gen_status_ok)
173     goto err;
174
175   event_id = event_id + 1;
176   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
177   memset(json_str, '\0', DATA_MAX_NAME_LEN);
178   snprintf(json_str, event_id_len, "%d", event_id);
179
180   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
181     goto err;
182   }
183
184   // eventName
185   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
186                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
187       yajl_gen_status_ok)
188     goto err;
189
190   int event_name_len = 0;
191   event_name_len = event_name_len + strlen(interface);    // interface name
192   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
193   event_name_len =
194       event_name_len + 12; // "interface", 2 spaces and null-terminator
195   memset(json_str, '\0', DATA_MAX_NAME_LEN);
196   snprintf(json_str, event_name_len, "interface %s %s", interface,
197            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
198                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
199
200   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
201       yajl_gen_status_ok) {
202     goto err;
203   }
204
205   // lastEpochMicrosec
206   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
207                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
208       yajl_gen_status_ok)
209     goto err;
210
211   int last_epoch_microsec_len =
212       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
213   memset(json_str, '\0', DATA_MAX_NAME_LEN);
214   snprintf(json_str, last_epoch_microsec_len, "%llu",
215            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
216
217   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
218     goto err;
219   }
220
221   // priority
222   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
223                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
224       yajl_gen_status_ok)
225     goto err;
226
227   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
228                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
229       yajl_gen_status_ok)
230     goto err;
231
232   // reportingEntityName
233   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
234                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
235       yajl_gen_status_ok)
236     goto err;
237
238   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
239                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
240       yajl_gen_status_ok)
241     goto err;
242
243   // sequence
244   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
245                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
246       yajl_gen_status_ok)
247     goto err;
248
249   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
250                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
251       yajl_gen_status_ok)
252     goto err;
253
254   // sourceName
255   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
256                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
257       yajl_gen_status_ok)
258     goto err;
259
260   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
261       yajl_gen_status_ok)
262     goto err;
263
264   // startEpochMicrosec
265   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
266                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
267       yajl_gen_status_ok)
268     goto err;
269
270   int start_epoch_microsec_len =
271       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
272   memset(json_str, '\0', DATA_MAX_NAME_LEN);
273   snprintf(json_str, start_epoch_microsec_len, "%llu",
274            (long long unsigned int)timestamp);
275
276   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
277     goto err;
278   }
279
280   // version
281   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
282                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
283     goto err;
284
285   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
286                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
287     goto err;
288
289   // *** END common event header ***
290
291   // *** BEGIN state change fields ***
292
293   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
294                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
295       yajl_gen_status_ok)
296     goto err;
297
298   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
299     goto err;
300
301   // newState
302   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
303                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
304       yajl_gen_status_ok)
305     goto err;
306
307   int new_state_len =
308       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
309                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
310
311   if (yajl_gen_string(
312           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
313                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
314           new_state_len) != yajl_gen_status_ok)
315     goto err;
316
317   // oldState
318   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
319                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
320       yajl_gen_status_ok)
321     goto err;
322
323   int old_state_len =
324       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
325                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
326
327   if (yajl_gen_string(
328           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
329                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
330           old_state_len) != yajl_gen_status_ok)
331     goto err;
332
333   // stateChangeFieldsVersion
334   if (yajl_gen_string(g,
335                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
336                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
337       yajl_gen_status_ok)
338     goto err;
339
340   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
341                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
342       yajl_gen_status_ok)
343     goto err;
344
345   // stateInterface
346   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
347                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
348       yajl_gen_status_ok)
349     goto err;
350
351   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
352       yajl_gen_status_ok)
353     goto err;
354
355   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
356     goto err;
357
358   // *** END state change fields ***
359
360   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
361     goto err;
362
363   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
364     goto err;
365
366   *buf = malloc(strlen((char *)buf2) + 1);
367
368   if (*buf == NULL) {
369     char errbuf[1024];
370     ERROR("connectivity plugin: malloc failed during gen_message_payload: %s",
371           sstrerror(errno, errbuf, sizeof(errbuf)));
372     goto err;
373   }
374
375   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
376
377   yajl_gen_free(g);
378
379   return 0;
380
381 err:
382   yajl_gen_free(g);
383   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
384   return -1;
385 }
386
387 static interface_list_t *add_interface(const char *interface, int status,
388                                        int prev_status) {
389   interface_list_t *il;
390   char *interface2;
391
392   il = malloc(sizeof(*il));
393   if (il == NULL) {
394     char errbuf[1024];
395     ERROR("connectivity plugin: malloc failed during add_interface: %s",
396           sstrerror(errno, errbuf, sizeof(errbuf)));
397     return NULL;
398   }
399
400   interface2 = strdup(interface);
401   if (interface2 == NULL) {
402     char errbuf[1024];
403     sfree(il);
404     ERROR("connectivity plugin: strdup failed during add_interface: %s",
405           sstrerror(errno, errbuf, sizeof(errbuf)));
406     return NULL;
407   }
408
409   il->interface = interface2;
410   il->status = status;
411   il->prev_status = prev_status;
412   il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
413   il->sent = 0;
414   il->next = interface_list_head;
415   interface_list_head = il;
416
417   DEBUG("connectivity plugin: added interface %s", interface2);
418
419   return il;
420 }
421
422 static int connectivity_link_state(struct nlmsghdr *msg) {
423   int retval = 0;
424   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
425   struct nlattr *attr;
426   const char *dev = NULL;
427
428   pthread_mutex_lock(&connectivity_lock);
429
430   interface_list_t *il = NULL;
431
432   /* Scan attribute list for device name. */
433   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
434     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
435       continue;
436
437     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
438       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
439             "mnl_attr_validate "
440             "failed.");
441       pthread_mutex_unlock(&connectivity_lock);
442       return MNL_CB_ERROR;
443     }
444
445     dev = mnl_attr_get_str(attr);
446
447     // Check the list of interfaces we should monitor, if we've chosen
448     // a subset.  If we don't care about this one, abort.
449     if (ignorelist_match(ignorelist, dev) != 0) {
450       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
451             "interface: %s",
452             dev);
453       break;
454     }
455
456     for (il = interface_list_head; il != NULL; il = il->next)
457       if (strcmp(dev, il->interface) == 0)
458         break;
459
460     uint32_t prev_status;
461
462     if (il == NULL) {
463       // We haven't encountered this interface yet, so add it to the linked list
464       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
465
466       if (il == NULL) {
467         ERROR("connectivity plugin: unable to add interface %s during "
468               "connectivity_link_state",
469               dev);
470         return MNL_CB_ERROR;
471       }
472     }
473
474     prev_status = il->status;
475     il->status =
476         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
477     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
478
479     // If the new status is different than the previous status,
480     // store the previous status and set sent to zero
481     if (il->status != prev_status) {
482       il->prev_status = prev_status;
483       il->sent = 0;
484     }
485
486     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
487           il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
488
489     // no need to loop again, we found the interface name attr
490     // (otherwise the first if-statement in the loop would
491     // have moved us on with 'continue')
492     break;
493   }
494
495   pthread_mutex_unlock(&connectivity_lock);
496
497   return retval;
498 }
499
500 static int msg_handler(struct nlmsghdr *msg) {
501   switch (msg->nlmsg_type) {
502   case RTM_NEWADDR:
503   case RTM_DELADDR:
504   case RTM_NEWROUTE:
505   case RTM_DELROUTE:
506   case RTM_DELLINK:
507     // Not of interest in current version
508     break;
509   case RTM_NEWLINK:
510     connectivity_link_state(msg);
511     break;
512   default:
513     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
514           msg->nlmsg_type);
515     break;
516   }
517   return 0;
518 }
519
520 static int read_event(struct mnl_socket *nl,
521                       int (*msg_handler)(struct nlmsghdr *)) {
522   int status;
523   int ret = 0;
524   char buf[4096];
525   struct nlmsghdr *h;
526
527   if (nl == NULL)
528     return ret;
529
530   status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
531
532   if (status < 0) {
533     /* Socket non-blocking so bail out once we have read everything */
534     if (errno == EWOULDBLOCK || errno == EAGAIN)
535       return ret;
536
537     /* Anything else is an error */
538     ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
539           status);
540     return status;
541   }
542
543   if (status == 0) {
544     DEBUG("connectivity plugin: read_event: EOF\n");
545   }
546
547   /* We need to handle more than one message per 'recvmsg' */
548   for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
549        h = NLMSG_NEXT(h, status)) {
550     /* Finish reading */
551     if (h->nlmsg_type == NLMSG_DONE)
552       return ret;
553
554     /* Message is some kind of error */
555     if (h->nlmsg_type == NLMSG_ERROR) {
556       ERROR("connectivity plugin: read_event: Message is an error\n");
557       return -1; // Error
558     }
559
560     /* Call message handler */
561     if (msg_handler) {
562       ret = (*msg_handler)(h);
563       if (ret < 0) {
564         ERROR("connectivity plugin: read_event: Message handler error %d\n",
565               ret);
566         return ret;
567       }
568     } else {
569       ERROR("connectivity plugin: read_event: Error NULL message handler\n");
570       return -1;
571     }
572   }
573
574   return ret;
575 }
576
577 static void *connectivity_thread(void *arg) /* {{{ */
578 {
579   pthread_mutex_lock(&connectivity_lock);
580
581   while (connectivity_thread_loop > 0) {
582     int status;
583
584     pthread_mutex_unlock(&connectivity_lock);
585
586     status = read_event(sock, msg_handler);
587
588     pthread_mutex_lock(&connectivity_lock);
589
590     if (status < 0) {
591       connectivity_thread_error = 1;
592       break;
593     }
594
595     if (connectivity_thread_loop <= 0)
596       break;
597   } /* while (connectivity_thread_loop > 0) */
598
599   pthread_mutex_unlock(&connectivity_lock);
600
601   return ((void *)0);
602 } /* }}} void *connectivity_thread */
603
604 static int start_thread(void) /* {{{ */
605 {
606   int status;
607
608   pthread_mutex_lock(&connectivity_lock);
609
610   if (connectivity_thread_loop != 0) {
611     pthread_mutex_unlock(&connectivity_lock);
612     return (0);
613   }
614
615   connectivity_thread_loop = 1;
616   connectivity_thread_error = 0;
617
618   if (sock == NULL) {
619     sock = mnl_socket_open(NETLINK_ROUTE);
620     if (sock == NULL) {
621       ERROR(
622           "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
623       pthread_mutex_unlock(&connectivity_lock);
624       return (-1);
625     }
626
627     if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
628       ERROR(
629           "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
630       pthread_mutex_unlock(&connectivity_lock);
631       return (1);
632     }
633   }
634
635   status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
636                                 connectivity_thread,
637                                 /* arg = */ (void *)0, "connectivity");
638   if (status != 0) {
639     connectivity_thread_loop = 0;
640     ERROR("connectivity plugin: Starting thread failed.");
641     pthread_mutex_unlock(&connectivity_lock);
642     mnl_socket_close(sock);
643     return (-1);
644   }
645
646   pthread_mutex_unlock(&connectivity_lock);
647   return (0);
648 } /* }}} int start_thread */
649
650 static int stop_thread(int shutdown) /* {{{ */
651 {
652   int status;
653
654   if (sock != NULL)
655     mnl_socket_close(sock);
656
657   pthread_mutex_lock(&connectivity_lock);
658
659   if (connectivity_thread_loop == 0) {
660     pthread_mutex_unlock(&connectivity_lock);
661     return (-1);
662   }
663
664   connectivity_thread_loop = 0;
665   pthread_cond_broadcast(&connectivity_cond);
666   pthread_mutex_unlock(&connectivity_lock);
667
668   if (shutdown == 1) {
669     // Since the thread is blocking, calling pthread_join
670     // doesn't actually succeed in stopping it.  It will stick around
671     // until a NETLINK message is received on the socket (at which
672     // it will realize that "connectivity_thread_loop" is 0 and will
673     // break out of the read loop and be allowed to die).  This is
674     // fine when the process isn't supposed to be exiting, but in
675     // the case of a process shutdown, we don't want to have an
676     // idle thread hanging around.  Calling pthread_cancel here in
677     // the case of a shutdown is just assures that the thread is
678     // gone and that the process has been fully terminated.
679
680     DEBUG("connectivity plugin: Canceling thread for process shutdown");
681
682     status = pthread_cancel(connectivity_thread_id);
683
684     if (status != 0) {
685       ERROR("connectivity plugin: Unable to cancel thread: %d", status);
686       status = -1;
687     }
688   } else {
689     status = pthread_join(connectivity_thread_id, /* return = */ NULL);
690     if (status != 0) {
691       ERROR("connectivity plugin: Stopping thread failed.");
692       status = -1;
693     }
694   }
695
696   pthread_mutex_lock(&connectivity_lock);
697   memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
698   connectivity_thread_error = 0;
699   pthread_mutex_unlock(&connectivity_lock);
700
701   DEBUG("connectivity plugin: Finished requesting stop of thread");
702
703   return (status);
704 } /* }}} int stop_thread */
705
706 static int connectivity_init(void) /* {{{ */
707 {
708   if (monitor_all_interfaces) {
709     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
710            "be monitored");
711   }
712
713   return (start_thread());
714 } /* }}} int connectivity_init */
715
716 static int connectivity_config(const char *key, const char *value) /* {{{ */
717 {
718   if (ignorelist == NULL) {
719     ignorelist = ignorelist_create(/* invert = */ 1);
720   }
721
722   if (strcasecmp(key, "Interface") == 0) {
723     ignorelist_add(ignorelist, value);
724     monitor_all_interfaces = 0;
725   } else {
726     return (-1);
727   }
728
729   return (0);
730 } /* }}} int connectivity_config */
731
732 static void connectivity_dispatch_notification(
733     const char *interface, const char *type, /* {{{ */
734     gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
735   char *buf = NULL;
736   notification_t n = {
737       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
738
739   if (value == LINK_STATE_UP)
740     n.severity = NOTIF_OKAY;
741
742   sstrncpy(n.host, hostname_g, sizeof(n.host));
743   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
744   sstrncpy(n.type, "gauge", sizeof(n.type));
745   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
746
747   gen_message_payload(value, old_value, interface, timestamp, &buf);
748
749   notification_meta_t *m = calloc(1, sizeof(*m));
750
751   if (m == NULL) {
752     char errbuf[1024];
753     sfree(buf);
754     ERROR("connectivity plugin: unable to allocate metadata: %s",
755           sstrerror(errno, errbuf, sizeof(errbuf)));
756     return;
757   }
758
759   sstrncpy(m->name, "ves", sizeof(m->name));
760   m->nm_value.nm_string = sstrdup(buf);
761   m->type = NM_TYPE_STRING;
762   n.meta = m;
763
764   DEBUG("connectivity plugin: notification message: %s",
765         n.meta->nm_value.nm_string);
766
767   DEBUG("connectivity plugin: dispatching state %d for interface %s",
768         (int)value, interface);
769
770   plugin_dispatch_notification(&n);
771   plugin_notification_meta_free(n.meta);
772
773   // malloc'd in gen_message_payload
774   if (buf != NULL)
775     sfree(buf);
776 }
777
778 static int connectivity_read(void) /* {{{ */
779 {
780   if (connectivity_thread_error != 0) {
781     ERROR("connectivity plugin: The interface thread had a problem. Restarting "
782           "it.");
783
784     stop_thread(0);
785
786     for (interface_list_t *il = interface_list_head; il != NULL;
787          il = il->next) {
788       il->status = LINK_STATE_UNKNOWN;
789       il->prev_status = LINK_STATE_UNKNOWN;
790       il->sent = 0;
791     }
792
793     start_thread();
794
795     return (-1);
796   } /* if (connectivity_thread_error != 0) */
797
798   for (interface_list_t *il = interface_list_head; il != NULL;
799        il = il->next) /* {{{ */
800   {
801     uint32_t status;
802     uint32_t prev_status;
803     uint32_t sent;
804
805     pthread_mutex_lock(&connectivity_lock);
806
807     status = il->status;
808     prev_status = il->prev_status;
809     sent = il->sent;
810
811     if (status != prev_status && sent == 0) {
812       connectivity_dispatch_notification(il->interface, "gauge", status,
813                                          prev_status, il->timestamp);
814       il->sent = 1;
815     }
816
817     pthread_mutex_unlock(&connectivity_lock);
818   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
819
820   return (0);
821 } /* }}} int connectivity_read */
822
823 static int connectivity_shutdown(void) /* {{{ */
824 {
825   interface_list_t *il;
826
827   DEBUG("connectivity plugin: Shutting down thread.");
828   if (stop_thread(1) < 0)
829     return (-1);
830
831   il = interface_list_head;
832   while (il != NULL) {
833     interface_list_t *il_next;
834
835     il_next = il->next;
836
837     sfree(il->interface);
838     sfree(il);
839
840     il = il_next;
841   }
842
843   ignorelist_free(ignorelist);
844
845   return (0);
846 } /* }}} int connectivity_shutdown */
847
848 void module_register(void) {
849   plugin_register_config("connectivity", connectivity_config, config_keys,
850                          config_keys_num);
851   plugin_register_init("connectivity", connectivity_init);
852   plugin_register_read("connectivity", connectivity_read);
853   plugin_register_shutdown("connectivity", connectivity_shutdown);
854 } /* void module_register */