monitor all interfaces by default
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <net/if.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <libmnl/libmnl.h>
45 #include <linux/netlink.h>
46 #include <linux/rtnetlink.h>
47
48 #include <yajl/yajl_common.h>
49 #include <yajl/yajl_gen.h>
50 #if HAVE_YAJL_YAJL_VERSION_H
51 #include <yajl/yajl_version.h>
52 #endif
53 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
54 #define HAVE_YAJL_V2 1
55 #endif
56
57 #define MYPROTO NETLINK_ROUTE
58
59 #define LINK_STATE_DOWN 0
60 #define LINK_STATE_UP 1
61 #define LINK_STATE_UNKNOWN 2
62
63 #define CONNECTIVITY_DOMAIN_FIELD "domain"
64 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
65 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
66 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
67 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
68 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
69 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
70 #define CONNECTIVITY_PRIORITY_FIELD "priority"
71 #define CONNECTIVITY_PRIORITY_VALUE "high"
72 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
74 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
75 #define CONNECTIVITY_SEQUENCE_VALUE "0"
76 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
77 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
78 #define CONNECTIVITY_VERSION_FIELD "version"
79 #define CONNECTIVITY_VERSION_VALUE "1.0"
80
81 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
82 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
83 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
84 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
85 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
86 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
87 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
89   "stateChangeFieldsVersion"
90 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
91 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
92
93 /*
94  * Private data types
95  */
96 struct interface_list_s {
97   char *interface;
98
99   uint32_t status;
100   uint32_t prev_status;
101   uint32_t sent;
102   long long unsigned int timestamp;
103
104   struct interface_list_s *next;
105 };
106 typedef struct interface_list_s interface_list_t;
107
108 /*
109  * Private variables
110  */
111 static interface_list_t *interface_list_head = NULL;
112 static int monitor_all_interfaces = 0;
113
114 static int connectivity_thread_loop = 0;
115 static int connectivity_thread_error = 0;
116 static pthread_t connectivity_thread_id;
117 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
118 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
119 static struct mnl_socket *sock;
120 static int event_id = 0;
121
122 static const char *config_keys[] = {"Interface"};
123 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
124
125 /*
126  * Private functions
127  */
128
129 static int gen_message_payload(int state, int old_state, const char *interface,
130                                long long unsigned int timestamp, char **buf) {
131   const unsigned char *buf2;
132   yajl_gen g;
133   char json_str[DATA_MAX_NAME_LEN];
134
135 #if !defined(HAVE_YAJL_V2)
136   yajl_gen_config conf = {};
137
138   conf.beautify = 0;
139 #endif
140
141 #if HAVE_YAJL_V2
142   size_t len;
143   g = yajl_gen_alloc(NULL);
144   yajl_gen_config(g, yajl_gen_beautify, 0);
145 #else
146   unsigned int len;
147   g = yajl_gen_alloc(&conf, NULL);
148 #endif
149
150   yajl_gen_clear(g);
151
152   // *** BEGIN common event header ***
153
154   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
155     goto err;
156
157   // domain
158   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
159                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
160     goto err;
161
162   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
163                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
164     goto err;
165
166   // eventId
167   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
168                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
169       yajl_gen_status_ok)
170     goto err;
171
172   event_id = event_id + 1;
173   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
174   memset(json_str, '\0', DATA_MAX_NAME_LEN);
175   snprintf(json_str, event_id_len, "%d", event_id);
176
177   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
178     goto err;
179   }
180
181   // eventName
182   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
183                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
184       yajl_gen_status_ok)
185     goto err;
186
187   int event_name_len = 0;
188   event_name_len = event_name_len + strlen(interface);    // interface name
189   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
190   event_name_len =
191       event_name_len + 12; // "interface", 2 spaces and null-terminator
192   memset(json_str, '\0', DATA_MAX_NAME_LEN);
193   snprintf(json_str, event_name_len, "interface %s %s", interface,
194            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
195                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
196
197   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
198       yajl_gen_status_ok) {
199     goto err;
200   }
201
202   // lastEpochMicrosec
203   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
204                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
205       yajl_gen_status_ok)
206     goto err;
207
208   int last_epoch_microsec_len =
209       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
210   memset(json_str, '\0', DATA_MAX_NAME_LEN);
211   snprintf(json_str, last_epoch_microsec_len, "%llu",
212            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
213
214   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
215     goto err;
216   }
217
218   // priority
219   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
220                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
221       yajl_gen_status_ok)
222     goto err;
223
224   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
225                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
226       yajl_gen_status_ok)
227     goto err;
228
229   // reportingEntityName
230   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
231                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
232       yajl_gen_status_ok)
233     goto err;
234
235   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
236                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
237       yajl_gen_status_ok)
238     goto err;
239
240   // sequence
241   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
242                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
243       yajl_gen_status_ok)
244     goto err;
245
246   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
247                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
248       yajl_gen_status_ok)
249     goto err;
250
251   // sourceName
252   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
253                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
254       yajl_gen_status_ok)
255     goto err;
256
257   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
258       yajl_gen_status_ok)
259     goto err;
260
261   // startEpochMicrosec
262   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
263                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
264       yajl_gen_status_ok)
265     goto err;
266
267   int start_epoch_microsec_len =
268       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
269   memset(json_str, '\0', DATA_MAX_NAME_LEN);
270   snprintf(json_str, start_epoch_microsec_len, "%llu",
271            (long long unsigned int)timestamp);
272
273   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
274     goto err;
275   }
276
277   // version
278   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
279                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
280     goto err;
281
282   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
283                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
284     goto err;
285
286   // *** END common event header ***
287
288   // *** BEGIN state change fields ***
289
290   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
291                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
292       yajl_gen_status_ok)
293     goto err;
294
295   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
296     goto err;
297
298   // newState
299   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
300                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
301       yajl_gen_status_ok)
302     goto err;
303
304   int new_state_len =
305       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
306                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
307
308   if (yajl_gen_string(
309           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
310                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
311           new_state_len) != yajl_gen_status_ok)
312     goto err;
313
314   // oldState
315   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
316                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
317       yajl_gen_status_ok)
318     goto err;
319
320   int old_state_len =
321       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
322                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
323
324   if (yajl_gen_string(
325           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
326                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
327           old_state_len) != yajl_gen_status_ok)
328     goto err;
329
330   // stateChangeFieldsVersion
331   if (yajl_gen_string(g,
332                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
333                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
334       yajl_gen_status_ok)
335     goto err;
336
337   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
338                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
339       yajl_gen_status_ok)
340     goto err;
341
342   // stateInterface
343   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
344                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
345       yajl_gen_status_ok)
346     goto err;
347
348   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
349       yajl_gen_status_ok)
350     goto err;
351
352   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
353     goto err;
354
355   // *** END state change fields ***
356
357   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
358     goto err;
359
360   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
361     goto err;
362
363   *buf = malloc(strlen((char *)buf2) + 1);
364
365   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
366
367   yajl_gen_free(g);
368
369   return 0;
370
371 err:
372   yajl_gen_free(g);
373   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
374   return -1;
375 }
376
377 static interface_list_t *add_interface(const char *interface, int status,
378                                        int prev_status) {
379   interface_list_t *il;
380   char *interface2;
381
382   il = malloc(sizeof(*il));
383   if (il == NULL) {
384     char errbuf[1024];
385     ERROR("connectivity plugin: malloc failed during add_interface: %s",
386           sstrerror(errno, errbuf, sizeof(errbuf)));
387     return NULL;
388   }
389
390   interface2 = strdup(interface);
391   if (interface2 == NULL) {
392     char errbuf[1024];
393     sfree(il);
394     ERROR("connectivity plugin: strdup failed during add_interface: %s",
395           sstrerror(errno, errbuf, sizeof(errbuf)));
396     return NULL;
397   }
398
399   il->interface = interface2;
400   il->status = status;
401   il->prev_status = prev_status;
402   il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
403   il->sent = 0;
404   il->next = interface_list_head;
405   interface_list_head = il;
406
407   DEBUG("connectivity plugin: added interface %s", interface2);
408
409   return il;
410 }
411
412 static int connectivity_link_state(struct nlmsghdr *msg) {
413   int retval = 0;
414   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
415   struct nlattr *attr;
416   const char *dev = NULL;
417
418   pthread_mutex_lock(&connectivity_lock);
419
420   interface_list_t *il;
421
422   /* Scan attribute list for device name. */
423   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
424     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
425       continue;
426
427     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
428       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
429             "mnl_attr_validate "
430             "failed.");
431       pthread_mutex_unlock(&connectivity_lock);
432       return MNL_CB_ERROR;
433     }
434
435     dev = mnl_attr_get_str(attr);
436
437     for (il = interface_list_head; il != NULL; il = il->next)
438       if (strcmp(dev, il->interface) == 0)
439         break;
440
441     if (il == NULL && monitor_all_interfaces == 0) {
442       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
443             "interface: %s",
444             dev);
445     } else {
446       uint32_t prev_status;
447
448       if (il == NULL) {
449         // We're monitoring all interfaces and we haven't encountered this one
450         // yet, so add it to the linked list
451         il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
452
453         if (il == NULL) {
454           ERROR("connectivity plugin: unable to add interface %s during "
455                 "connectivity_link_state",
456                 dev);
457           return MNL_CB_ERROR;
458         }
459       }
460
461       prev_status = il->status;
462       il->status =
463           ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
464       il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
465
466       // If the new status is different than the previous status,
467       // store the previous status and set sent to zero
468       if (il->status != prev_status) {
469         il->prev_status = prev_status;
470         il->sent = 0;
471       }
472
473       DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
474             il->timestamp, dev,
475             ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
476     }
477
478     // no need to loop again, we found the interface name attr
479     // (otherwise the first if-statement in the loop would
480     // have moved us on with 'continue')
481     break;
482   }
483
484   pthread_mutex_unlock(&connectivity_lock);
485
486   return retval;
487 }
488
489 static int msg_handler(struct nlmsghdr *msg) {
490   switch (msg->nlmsg_type) {
491   case RTM_NEWADDR:
492   case RTM_DELADDR:
493   case RTM_NEWROUTE:
494   case RTM_DELROUTE:
495   case RTM_DELLINK:
496     // Not of interest in current version
497     break;
498   case RTM_NEWLINK:
499     connectivity_link_state(msg);
500     break;
501   default:
502     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
503           msg->nlmsg_type);
504     break;
505   }
506   return 0;
507 }
508
509 static int read_event(struct mnl_socket *nl,
510                       int (*msg_handler)(struct nlmsghdr *)) {
511   int status;
512   int ret = 0;
513   char buf[4096];
514   struct nlmsghdr *h;
515
516   if (nl == NULL)
517     return ret;
518
519   status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
520
521   if (status < 0) {
522     /* Socket non-blocking so bail out once we have read everything */
523     if (errno == EWOULDBLOCK || errno == EAGAIN)
524       return ret;
525
526     /* Anything else is an error */
527     ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
528           status);
529     return status;
530   }
531
532   if (status == 0) {
533     DEBUG("connectivity plugin: read_event: EOF\n");
534   }
535
536   /* We need to handle more than one message per 'recvmsg' */
537   for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
538        h = NLMSG_NEXT(h, status)) {
539     /* Finish reading */
540     if (h->nlmsg_type == NLMSG_DONE)
541       return ret;
542
543     /* Message is some kind of error */
544     if (h->nlmsg_type == NLMSG_ERROR) {
545       ERROR("connectivity plugin: read_event: Message is an error\n");
546       return -1; // Error
547     }
548
549     /* Call message handler */
550     if (msg_handler) {
551       ret = (*msg_handler)(h);
552       if (ret < 0) {
553         ERROR("connectivity plugin: read_event: Message handler error %d\n",
554               ret);
555         return ret;
556       }
557     } else {
558       ERROR("connectivity plugin: read_event: Error NULL message handler\n");
559       return -1;
560     }
561   }
562
563   return ret;
564 }
565
566 static void *connectivity_thread(void *arg) /* {{{ */
567 {
568   pthread_mutex_lock(&connectivity_lock);
569
570   while (connectivity_thread_loop > 0) {
571     int status;
572
573     pthread_mutex_unlock(&connectivity_lock);
574
575     status = read_event(sock, msg_handler);
576
577     pthread_mutex_lock(&connectivity_lock);
578
579     if (status < 0) {
580       connectivity_thread_error = 1;
581       break;
582     }
583
584     if (connectivity_thread_loop <= 0)
585       break;
586   } /* while (connectivity_thread_loop > 0) */
587
588   pthread_mutex_unlock(&connectivity_lock);
589
590   return ((void *)0);
591 } /* }}} void *connectivity_thread */
592
593 static int start_thread(void) /* {{{ */
594 {
595   int status;
596
597   pthread_mutex_lock(&connectivity_lock);
598
599   if (connectivity_thread_loop != 0) {
600     pthread_mutex_unlock(&connectivity_lock);
601     return (0);
602   }
603
604   connectivity_thread_loop = 1;
605   connectivity_thread_error = 0;
606
607   if (sock == NULL) {
608     sock = mnl_socket_open(NETLINK_ROUTE);
609     if (sock == NULL) {
610       ERROR(
611           "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
612       pthread_mutex_unlock(&connectivity_lock);
613       return (-1);
614     }
615
616     if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
617       ERROR(
618           "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
619       pthread_mutex_unlock(&connectivity_lock);
620       return (1);
621     }
622   }
623
624   status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
625                                 connectivity_thread,
626                                 /* arg = */ (void *)0, "connectivity");
627   if (status != 0) {
628     connectivity_thread_loop = 0;
629     ERROR("connectivity plugin: Starting thread failed.");
630     pthread_mutex_unlock(&connectivity_lock);
631     mnl_socket_close(sock);
632     return (-1);
633   }
634
635   pthread_mutex_unlock(&connectivity_lock);
636   return (0);
637 } /* }}} int start_thread */
638
639 static int stop_thread(int shutdown) /* {{{ */
640 {
641   int status;
642
643   if (sock != NULL)
644     mnl_socket_close(sock);
645
646   pthread_mutex_lock(&connectivity_lock);
647
648   if (connectivity_thread_loop == 0) {
649     pthread_mutex_unlock(&connectivity_lock);
650     return (-1);
651   }
652
653   connectivity_thread_loop = 0;
654   pthread_cond_broadcast(&connectivity_cond);
655   pthread_mutex_unlock(&connectivity_lock);
656
657   if (shutdown == 1) {
658     // Since the thread is blocking, calling pthread_join
659     // doesn't actually succeed in stopping it.  It will stick around
660     // until a NETLINK message is received on the socket (at which
661     // it will realize that "connectivity_thread_loop" is 0 and will
662     // break out of the read loop and be allowed to die).  This is
663     // fine when the process isn't supposed to be exiting, but in
664     // the case of a process shutdown, we don't want to have an
665     // idle thread hanging around.  Calling pthread_cancel here in
666     // the case of a shutdown is just assures that the thread is
667     // gone and that the process has been fully terminated.
668
669     DEBUG("connectivity plugin: Canceling thread for process shutdown");
670
671     status = pthread_cancel(connectivity_thread_id);
672
673     if (status != 0) {
674       ERROR("connectivity plugin: Unable to cancel thread: %d", status);
675       status = -1;
676     }
677   } else {
678     status = pthread_join(connectivity_thread_id, /* return = */ NULL);
679     if (status != 0) {
680       ERROR("connectivity plugin: Stopping thread failed.");
681       status = -1;
682     }
683   }
684
685   pthread_mutex_lock(&connectivity_lock);
686   memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
687   connectivity_thread_error = 0;
688   pthread_mutex_unlock(&connectivity_lock);
689
690   DEBUG("connectivity plugin: Finished requesting stop of thread");
691
692   return (status);
693 } /* }}} int stop_thread */
694
695 static int connectivity_init(void) /* {{{ */
696 {
697   if (interface_list_head == NULL) {
698     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
699            "be monitored");
700     monitor_all_interfaces = 1;
701   }
702
703   return (start_thread());
704 } /* }}} int connectivity_init */
705
706 static int connectivity_config(const char *key, const char *value) /* {{{ */
707 {
708   if (strcasecmp(key, "Interface") == 0) {
709     interface_list_t *il =
710         add_interface(value, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
711     if (il == NULL) {
712       ERROR("connectivity plugin: unable to add interface %s during "
713             "connectivity_init",
714             value);
715       return (-1);
716     }
717   } else {
718     return (-1);
719   }
720
721   return (0);
722 } /* }}} int connectivity_config */
723
724 static void connectivity_dispatch_notification(
725     const char *interface, const char *type, /* {{{ */
726     gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
727   char *buf = NULL;
728   notification_t n = {
729       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
730
731   if (value == LINK_STATE_UP)
732     n.severity = NOTIF_OKAY;
733
734   char hostname[1024];
735   gethostname(hostname, sizeof(hostname));
736
737   sstrncpy(n.host, hostname, sizeof(n.host));
738   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
739   sstrncpy(n.type, "gauge", sizeof(n.type));
740   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
741
742   gen_message_payload(value, old_value, interface, timestamp, &buf);
743
744   notification_meta_t *m = calloc(1, sizeof(*m));
745
746   if (m == NULL) {
747     char errbuf[1024];
748     sfree(buf);
749     ERROR("connectivity plugin: unable to allocate metadata: %s",
750           sstrerror(errno, errbuf, sizeof(errbuf)));
751     return;
752   }
753
754   sstrncpy(m->name, "ves", sizeof(m->name));
755   m->nm_value.nm_string = sstrdup(buf);
756   m->type = NM_TYPE_STRING;
757   n.meta = m;
758
759   DEBUG("connectivity plugin: notification message: %s",
760         n.meta->nm_value.nm_string);
761
762   DEBUG("connectivity plugin: dispatching state %d for interface %s",
763         (int)value, interface);
764
765   plugin_dispatch_notification(&n);
766   plugin_notification_meta_free(n.meta);
767
768   // malloc'd in gen_message_payload
769   if (buf != NULL)
770     sfree(buf);
771 }
772
773 static int connectivity_read(void) /* {{{ */
774 {
775   if (connectivity_thread_error != 0) {
776     ERROR("connectivity plugin: The interface thread had a problem. Restarting "
777           "it.");
778
779     stop_thread(0);
780
781     for (interface_list_t *il = interface_list_head; il != NULL;
782          il = il->next) {
783       il->status = LINK_STATE_UNKNOWN;
784       il->prev_status = LINK_STATE_UNKNOWN;
785       il->sent = 0;
786     }
787
788     start_thread();
789
790     return (-1);
791   } /* if (connectivity_thread_error != 0) */
792
793   for (interface_list_t *il = interface_list_head; il != NULL;
794        il = il->next) /* {{{ */
795   {
796     uint32_t status;
797     uint32_t prev_status;
798     uint32_t sent;
799
800     pthread_mutex_lock(&connectivity_lock);
801
802     status = il->status;
803     prev_status = il->prev_status;
804     sent = il->sent;
805
806     if (status != prev_status && sent == 0) {
807       connectivity_dispatch_notification(il->interface, "gauge", status,
808                                          prev_status, il->timestamp);
809       il->sent = 1;
810     }
811
812     pthread_mutex_unlock(&connectivity_lock);
813   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
814
815   return (0);
816 } /* }}} int connectivity_read */
817
818 static int connectivity_shutdown(void) /* {{{ */
819 {
820   interface_list_t *il;
821
822   DEBUG("connectivity plugin: Shutting down thread.");
823   if (stop_thread(1) < 0)
824     return (-1);
825
826   il = interface_list_head;
827   while (il != NULL) {
828     interface_list_t *il_next;
829
830     il_next = il->next;
831
832     sfree(il->interface);
833     sfree(il);
834
835     il = il_next;
836   }
837
838   return (0);
839 } /* }}} int connectivity_shutdown */
840
841 void module_register(void) {
842   plugin_register_config("connectivity", connectivity_config, config_keys,
843                          config_keys_num);
844   plugin_register_init("connectivity", connectivity_init);
845   plugin_register_read("connectivity", connectivity_read);
846   plugin_register_shutdown("connectivity", connectivity_shutdown);
847 } /* void module_register */