Styling/optimization cleanup + proper use of cdtime
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33 #include "utils_ignorelist.h"
34
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57
58 #define MYPROTO NETLINK_ROUTE
59
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
90   "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93
94 /*
95  * Private data types
96  */
97
98 struct interface_list_s {
99   char *interface;
100
101   uint32_t status;
102   uint32_t prev_status;
103   uint32_t sent;
104   cdtime_t timestamp;
105
106   struct interface_list_s *next;
107 };
108 typedef struct interface_list_s interface_list_t;
109
110 /*
111  * Private variables
112  */
113
114 static ignorelist_t *ignorelist = NULL;
115
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
118
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static pthread_t connectivity_dequeue_thread_id;
124 static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
125 static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
127 static int nl_sock = -1;
128 static int event_id = 0;
129 static int statuses_to_send = 0;
130
131 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
132 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
133
134 /*
135  * Private functions
136  */
137
138 static int gen_message_payload(int state, int old_state, const char *interface,
139                                cdtime_t timestamp, char **buf) {
140   const unsigned char *buf2;
141   yajl_gen g;
142   char json_str[DATA_MAX_NAME_LEN];
143
144 #if !defined(HAVE_YAJL_V2)
145   yajl_gen_config conf = {0};
146 #endif
147
148 #if HAVE_YAJL_V2
149   size_t len;
150   g = yajl_gen_alloc(NULL);
151   yajl_gen_config(g, yajl_gen_beautify, 0);
152 #else
153   unsigned int len;
154   g = yajl_gen_alloc(&conf, NULL);
155 #endif
156
157   yajl_gen_clear(g);
158
159   // *** BEGIN common event header ***
160
161   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
162     goto err;
163
164   // domain
165   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
166                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
167     goto err;
168
169   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
170                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
171     goto err;
172
173   // eventId
174   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
175                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
176       yajl_gen_status_ok)
177     goto err;
178
179   event_id = event_id + 1;
180   if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
181     goto err;
182   }
183
184   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
185     goto err;
186   }
187
188   // eventName
189   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
190                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
191       yajl_gen_status_ok)
192     goto err;
193
194   if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
195                (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
196                            : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
197     goto err;
198   }
199
200   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
201       yajl_gen_status_ok) {
202     goto err;
203   }
204
205   // lastEpochMicrosec
206   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
207                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
208       yajl_gen_status_ok)
209     goto err;
210
211   if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
212                CDTIME_T_TO_US(cdtime())) < 0) {
213     goto err;
214   }
215
216   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
217     goto err;
218   }
219
220   // priority
221   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
222                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
223       yajl_gen_status_ok)
224     goto err;
225
226   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
227                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
228       yajl_gen_status_ok)
229     goto err;
230
231   // reportingEntityName
232   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
233                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
234       yajl_gen_status_ok)
235     goto err;
236
237   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
238                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
239       yajl_gen_status_ok)
240     goto err;
241
242   // sequence
243   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
244                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
245       yajl_gen_status_ok)
246     goto err;
247
248   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
249                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
250       yajl_gen_status_ok)
251     goto err;
252
253   // sourceName
254   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
255                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
256       yajl_gen_status_ok)
257     goto err;
258
259   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
260       yajl_gen_status_ok)
261     goto err;
262
263   // startEpochMicrosec
264   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
265                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
266       yajl_gen_status_ok)
267     goto err;
268
269   if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
270                CDTIME_T_TO_US(timestamp)) < 0) {
271     goto err;
272   }
273
274   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
275     goto err;
276   }
277
278   // version
279   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
280                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
281     goto err;
282
283   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
284                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
285     goto err;
286
287   // *** END common event header ***
288
289   // *** BEGIN state change fields ***
290
291   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
292                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
293       yajl_gen_status_ok)
294     goto err;
295
296   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
297     goto err;
298
299   // newState
300   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
301                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
302       yajl_gen_status_ok)
303     goto err;
304
305   int new_state_len =
306       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
307                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
308
309   if (yajl_gen_string(
310           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
311                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
312           new_state_len) != yajl_gen_status_ok)
313     goto err;
314
315   // oldState
316   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
317                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
318       yajl_gen_status_ok)
319     goto err;
320
321   int old_state_len =
322       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
323                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
324
325   if (yajl_gen_string(
326           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
327                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
328           old_state_len) != yajl_gen_status_ok)
329     goto err;
330
331   // stateChangeFieldsVersion
332   if (yajl_gen_string(g,
333                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
334                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
335       yajl_gen_status_ok)
336     goto err;
337
338   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
339                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
340       yajl_gen_status_ok)
341     goto err;
342
343   // stateInterface
344   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
345                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
346       yajl_gen_status_ok)
347     goto err;
348
349   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
350       yajl_gen_status_ok)
351     goto err;
352
353   // close state change and header fields
354   if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
355       yajl_gen_map_close(g) != yajl_gen_status_ok)
356     goto err;
357
358   // *** END state change fields ***
359
360   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
361     goto err;
362
363   *buf = strdup((char *)buf2);
364
365   if (*buf == NULL) {
366     ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
367           STRERRNO);
368     goto err;
369   }
370
371   yajl_gen_free(g);
372
373   return 0;
374
375 err:
376   yajl_gen_free(g);
377   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
378   return -1;
379 }
380
381 static interface_list_t *add_interface(const char *interface, int status,
382                                        int prev_status) {
383   interface_list_t *il = calloc(1, sizeof(*il));
384
385   if (il == NULL) {
386     ERROR("connectivity plugin: calloc failed during add_interface: %s",
387           STRERRNO);
388     return NULL;
389   }
390
391   char *interface2 = strdup(interface);
392   if (interface2 == NULL) {
393     sfree(il);
394     ERROR("connectivity plugin: strdup failed during add_interface: %s",
395           STRERRNO);
396     return NULL;
397   }
398
399   il->interface = interface2;
400   il->status = status;
401   il->prev_status = prev_status;
402   il->timestamp = cdtime();
403   il->sent = 0;
404   il->next = interface_list_head;
405   interface_list_head = il;
406
407   DEBUG("connectivity plugin: added interface %s", interface2);
408
409   return il;
410 }
411
412 static int connectivity_link_state(struct nlmsghdr *msg) {
413   pthread_mutex_lock(&connectivity_data_lock);
414
415   struct nlattr *attr;
416   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
417
418   /* Scan attribute list for device name. */
419   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
420     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
421       continue;
422
423     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
424       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
425             "mnl_attr_validate "
426             "failed.");
427       pthread_mutex_unlock(&connectivity_data_lock);
428       return MNL_CB_ERROR;
429     }
430
431     const char *dev = mnl_attr_get_str(attr);
432
433     // Check the list of interfaces we should monitor, if we've chosen
434     // a subset.  If we don't care about this one, abort.
435     if (ignorelist_match(ignorelist, dev) != 0) {
436       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
437             "interface: %s",
438             dev);
439       break;
440     }
441
442     interface_list_t *il = NULL;
443
444     for (il = interface_list_head; il != NULL; il = il->next)
445       if (strcmp(dev, il->interface) == 0)
446         break;
447
448     if (il == NULL) {
449       // We haven't encountered this interface yet, so add it to the linked list
450       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
451
452       if (il == NULL) {
453         ERROR("connectivity plugin: unable to add interface %s during "
454               "connectivity_link_state",
455               dev);
456         return MNL_CB_ERROR;
457       }
458     }
459
460     uint32_t prev_status = il->status;
461     il->status =
462         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
463     il->timestamp = cdtime();
464
465     // If the new status is different than the previous status,
466     // store the previous status and set sent to zero, and set the
467     // global flag to indicate there are statuses to dispatch
468     if (il->status != prev_status) {
469       il->prev_status = prev_status;
470       il->sent = 0;
471       statuses_to_send = 1;
472     }
473
474     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
475           il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
476
477     // no need to loop again, we found the interface name attr
478     // (otherwise the first if-statement in the loop would
479     // have moved us on with 'continue')
480     break;
481   }
482
483   pthread_mutex_unlock(&connectivity_data_lock);
484
485   return 0;
486 }
487
488 static int msg_handler(struct nlmsghdr *msg) {
489   // We are only interested in RTM_NEWLINK messages
490   if (msg->nlmsg_type != RTM_NEWLINK) {
491     return 0;
492   }
493   return connectivity_link_state(msg);
494 }
495
496 static int read_event(int (*msg_handler)(struct nlmsghdr *)) {
497   int ret = 0;
498   int recv_flags = MSG_DONTWAIT;
499
500   if (nl_sock == -1 || msg_handler == NULL)
501     return EINVAL;
502
503   while (42) {
504     pthread_mutex_lock(&connectivity_threads_lock);
505
506     if (connectivity_netlink_thread_loop <= 0) {
507       pthread_mutex_unlock(&connectivity_threads_lock);
508       return ret;
509     }
510
511     pthread_mutex_unlock(&connectivity_threads_lock);
512
513     char buf[4096];
514     int status = recv(nl_sock, buf, sizeof(buf), recv_flags);
515
516     if (status < 0) {
517
518       // If there were no more messages to drain from the socket,
519       // then signal the dequeue thread and allow it to dispatch
520       // any saved interface status changes.  Then continue, but
521       // block and wait for new messages
522       if (errno == EWOULDBLOCK || errno == EAGAIN) {
523         pthread_cond_signal(&connectivity_cond);
524
525         recv_flags = 0;
526         continue;
527       }
528
529       if (errno == EINTR) {
530         // Interrupt, so just continue and try again
531         continue;
532       }
533
534       /* Anything else is an error */
535       ERROR("connectivity plugin: read_event: Error recv: %d", status);
536       return status;
537     }
538
539     // Message received successfully, so we'll stop blocking on the
540     // receive call for now (until we get a "would block" error, which
541     // will be handled above)
542     recv_flags = MSG_DONTWAIT;
543
544     if (status == 0) {
545       DEBUG("connectivity plugin: read_event: EOF");
546     }
547
548     /* We need to handle more than one message per 'recvmsg' */
549     for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
550          NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
551       /* Finish reading */
552       if (h->nlmsg_type == NLMSG_DONE)
553         return ret;
554
555       /* Message is some kind of error */
556       if (h->nlmsg_type == NLMSG_ERROR) {
557         struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h);
558         ERROR("connectivity plugin: read_event: Message is an error: %d",
559               l_err->error);
560         return -1; // Error
561       }
562
563       /* Call message handler */
564       if (msg_handler) {
565         ret = (*msg_handler)(h);
566         if (ret < 0) {
567           ERROR("connectivity plugin: read_event: Message handler error %d",
568                 ret);
569           return ret;
570         }
571       } else {
572         ERROR("connectivity plugin: read_event: Error NULL message handler");
573         return -1;
574       }
575     }
576   }
577
578   return ret;
579 }
580
581 static void connectivity_dispatch_notification(const char *interface,
582                                                gauge_t value, gauge_t old_value,
583                                                cdtime_t timestamp) {
584
585   notification_t n = {
586       .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
587       .time = cdtime(),
588       .plugin = "connectivity",
589       .type = "gauge",
590       .type_instance = "interface_status",
591   };
592
593   sstrncpy(n.host, hostname_g, sizeof(n.host));
594   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
595
596   char *buf = NULL;
597
598   gen_message_payload(value, old_value, interface, timestamp, &buf);
599
600   int status = plugin_notification_meta_add_string(&n, "ves", buf);
601
602   if (status < 0) {
603     sfree(buf);
604     ERROR("connectivity plugin: unable to set notification VES metadata: %s",
605           STRERRNO);
606     return;
607   }
608
609   DEBUG("connectivity plugin: notification VES metadata: %s",
610         n.meta->nm_value.nm_string);
611
612   DEBUG("connectivity plugin: dispatching state %d for interface %s",
613         (int)value, interface);
614
615   plugin_dispatch_notification(&n);
616   plugin_notification_meta_free(n.meta);
617
618   // strdup'd in gen_message_payload
619   if (buf != NULL)
620     sfree(buf);
621 }
622
623 // NOTE: Caller MUST hold connectivity_data_lock when calling this function
624 static void send_interface_status() {
625   for (interface_list_t *il = interface_list_head; il != NULL;
626        il = il->next) /* {{{ */
627   {
628     uint32_t status = il->status;
629     uint32_t prev_status = il->prev_status;
630     uint32_t sent = il->sent;
631
632     if (status != prev_status && sent == 0) {
633       connectivity_dispatch_notification(il->interface, status, prev_status,
634                                          il->timestamp);
635       il->sent = 1;
636     }
637   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
638
639   statuses_to_send = 0;
640 }
641
642 static void read_interface_status() /* {{{ */
643 {
644   pthread_mutex_lock(&connectivity_data_lock);
645
646   // If we don't have any interface statuses to dispatch,
647   // then we wait until signalled
648   if (!statuses_to_send)
649     pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
650
651   send_interface_status();
652
653   pthread_mutex_unlock(&connectivity_data_lock);
654 } /* }}} int *read_interface_status */
655
656 static void *connectivity_netlink_thread(void *arg) /* {{{ */
657 {
658   pthread_mutex_lock(&connectivity_threads_lock);
659
660   while (connectivity_netlink_thread_loop > 0) {
661     pthread_mutex_unlock(&connectivity_threads_lock);
662
663     int status = read_event(msg_handler);
664
665     pthread_mutex_lock(&connectivity_threads_lock);
666
667     if (status < 0) {
668       connectivity_netlink_thread_error = 1;
669       break;
670     }
671   } /* while (connectivity_netlink_thread_loop > 0) */
672
673   pthread_mutex_unlock(&connectivity_threads_lock);
674
675   return (void *)0;
676 } /* }}} void *connectivity_netlink_thread */
677
678 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
679 {
680   pthread_mutex_lock(&connectivity_threads_lock);
681
682   while (connectivity_dequeue_thread_loop > 0) {
683     pthread_mutex_unlock(&connectivity_threads_lock);
684
685     read_interface_status();
686
687     pthread_mutex_lock(&connectivity_threads_lock);
688   } /* while (connectivity_dequeue_thread_loop > 0) */
689
690   pthread_mutex_unlock(&connectivity_threads_lock);
691
692   return ((void *)0);
693 } /* }}} void *connectivity_dequeue_thread */
694
695 static int nl_connect() {
696   struct sockaddr_nl sa_nl = {
697       .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
698   };
699
700   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
701   if (nl_sock == -1) {
702     ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
703     return -1;
704   }
705
706   int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
707   if (rc == -1) {
708     ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
709     close(nl_sock);
710     nl_sock = -1;
711     return -1;
712   }
713
714   return 0;
715 }
716
717 static int start_netlink_thread(void) /* {{{ */
718 {
719   pthread_mutex_lock(&connectivity_threads_lock);
720
721   if (connectivity_netlink_thread_loop != 0) {
722     pthread_mutex_unlock(&connectivity_threads_lock);
723     return 0;
724   }
725
726   connectivity_netlink_thread_loop = 1;
727   connectivity_netlink_thread_error = 0;
728
729   int status;
730
731   if (nl_sock == -1) {
732     status = nl_connect();
733
734     if (status != 0) {
735       pthread_mutex_unlock(&connectivity_threads_lock);
736       return status;
737     }
738   }
739
740   status = plugin_thread_create(&connectivity_netlink_thread_id,
741                                 /* attr = */ NULL, connectivity_netlink_thread,
742                                 /* arg = */ (void *)0, "connectivity");
743   if (status != 0) {
744     connectivity_netlink_thread_loop = 0;
745     ERROR("connectivity plugin: Starting thread failed.");
746     pthread_mutex_unlock(&connectivity_threads_lock);
747
748     int status2 = close(nl_sock);
749
750     if (status2 != 0) {
751       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
752             status2, STRERRNO);
753     }
754
755     nl_sock = -1;
756
757     return -1;
758   }
759
760   pthread_mutex_unlock(&connectivity_threads_lock);
761
762   return status;
763 }
764
765 static int start_dequeue_thread(void) /* {{{ */
766 {
767   pthread_mutex_lock(&connectivity_threads_lock);
768
769   if (connectivity_dequeue_thread_loop != 0) {
770     pthread_mutex_unlock(&connectivity_threads_lock);
771     return 0;
772   }
773
774   connectivity_dequeue_thread_loop = 1;
775
776   int status =
777       plugin_thread_create(&connectivity_dequeue_thread_id,
778                            /* attr = */ NULL, connectivity_dequeue_thread,
779                            /* arg = */ (void *)0, "connectivity");
780   if (status != 0) {
781     connectivity_dequeue_thread_loop = 0;
782     ERROR("connectivity plugin: Starting dequeue thread failed.");
783     pthread_mutex_unlock(&connectivity_threads_lock);
784     return -1;
785   }
786
787   pthread_mutex_unlock(&connectivity_threads_lock);
788
789   return status;
790 } /* }}} int start_dequeue_thread */
791
792 static int start_threads(void) /* {{{ */
793 {
794   int status = start_netlink_thread();
795   int status2 = start_dequeue_thread();
796
797   if (status != 0)
798     return status;
799   else
800     return status2;
801 } /* }}} int start_threads */
802
803 static int stop_netlink_thread(int shutdown) /* {{{ */
804 {
805   int socket_status;
806
807   if (nl_sock != -1) {
808     socket_status = close(nl_sock);
809     if (socket_status != 0) {
810       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
811             socket_status, STRERRNO);
812     }
813
814     nl_sock = -1;
815   } else
816     socket_status = 0;
817
818   pthread_mutex_lock(&connectivity_threads_lock);
819
820   if (connectivity_netlink_thread_loop == 0) {
821     pthread_mutex_unlock(&connectivity_threads_lock);
822     // Thread has already been terminated, nothing more to attempt
823     return socket_status;
824   }
825
826   // Set thread termination status
827   connectivity_netlink_thread_loop = 0;
828   pthread_mutex_unlock(&connectivity_threads_lock);
829
830   // Let threads waiting on access to the interface list know to move
831   // on such that they'll see the thread's termination status
832   pthread_cond_broadcast(&connectivity_cond);
833
834   int thread_status;
835
836   if (shutdown == 1) {
837     // Since the thread is blocking, calling pthread_join
838     // doesn't actually succeed in stopping it.  It will stick around
839     // until a NETLINK message is received on the socket (at which
840     // it will realize that "connectivity_netlink_thread_loop" is 0 and will
841     // break out of the read loop and be allowed to die).  This is
842     // fine when the process isn't supposed to be exiting, but in
843     // the case of a process shutdown, we don't want to have an
844     // idle thread hanging around.  Calling pthread_cancel here in
845     // the case of a shutdown is just assures that the thread is
846     // gone and that the process has been fully terminated.
847
848     DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
849
850     thread_status = pthread_cancel(connectivity_netlink_thread_id);
851
852     if (thread_status != 0 && thread_status != ESRCH) {
853       ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
854             thread_status);
855       thread_status = -1;
856     } else
857       thread_status = 0;
858   } else {
859     thread_status =
860         pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
861     if (thread_status != 0 && thread_status != ESRCH) {
862       ERROR("connectivity plugin: Stopping netlink thread failed: %d",
863             thread_status);
864       thread_status = -1;
865     } else
866       thread_status = 0;
867   }
868
869   pthread_mutex_lock(&connectivity_threads_lock);
870   memset(&connectivity_netlink_thread_id, 0,
871          sizeof(connectivity_netlink_thread_id));
872   connectivity_netlink_thread_error = 0;
873   pthread_mutex_unlock(&connectivity_threads_lock);
874
875   DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
876
877   if (socket_status != 0)
878     return socket_status;
879   else
880     return thread_status;
881 }
882
883 static int stop_dequeue_thread() /* {{{ */
884 {
885   pthread_mutex_lock(&connectivity_threads_lock);
886
887   if (connectivity_dequeue_thread_loop == 0) {
888     pthread_mutex_unlock(&connectivity_threads_lock);
889     return -1;
890   }
891
892   // Set thread termination status
893   connectivity_dequeue_thread_loop = 0;
894   pthread_mutex_unlock(&connectivity_threads_lock);
895
896   // Let threads waiting on access to the interface list know to move
897   // on such that they'll see the threads termination status
898   pthread_cond_broadcast(&connectivity_cond);
899
900   // Calling pthread_cancel here just assures that the thread is
901   // gone and that the process has been fully terminated.
902
903   DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
904
905   int status = pthread_cancel(connectivity_dequeue_thread_id);
906
907   if (status != 0 && status != ESRCH) {
908     ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
909     status = -1;
910   } else
911     status = 0;
912
913   pthread_mutex_lock(&connectivity_threads_lock);
914   memset(&connectivity_dequeue_thread_id, 0,
915          sizeof(connectivity_dequeue_thread_id));
916   pthread_mutex_unlock(&connectivity_threads_lock);
917
918   DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
919
920   return status;
921 } /* }}} int stop_dequeue_thread */
922
923 static int stop_threads() /* {{{ */
924 {
925   int status = stop_netlink_thread(1);
926   int status2 = stop_dequeue_thread();
927
928   if (status != 0)
929     return status;
930   else
931     return status2;
932 } /* }}} int stop_threads */
933
934 static int connectivity_init(void) /* {{{ */
935 {
936   if (monitor_all_interfaces) {
937     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
938            "be monitored");
939   }
940
941   return start_threads();
942 } /* }}} int connectivity_init */
943
944 static int connectivity_config(const char *key, const char *value) /* {{{ */
945 {
946   if (ignorelist == NULL) {
947     ignorelist = ignorelist_create(/* invert = */ 1);
948
949     if (ignorelist == NULL)
950       return -1;
951   }
952
953   if (strcasecmp(key, "Interface") == 0) {
954     ignorelist_add(ignorelist, value);
955     monitor_all_interfaces = 0;
956   } else if (strcasecmp(key, "IgnoreSelected") == 0) {
957     int invert = 1;
958     if (IS_TRUE(value))
959       invert = 0;
960     ignorelist_set_invert(ignorelist, invert);
961   } else {
962     return -1;
963   }
964
965   return 0;
966 } /* }}} int connectivity_config */
967
968 static int connectivity_read(void) /* {{{ */
969 {
970   pthread_mutex_lock(&connectivity_threads_lock);
971
972   if (connectivity_netlink_thread_error != 0) {
973
974     pthread_mutex_unlock(&connectivity_threads_lock);
975
976     ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
977           "it.");
978
979     stop_netlink_thread(0);
980
981     for (interface_list_t *il = interface_list_head; il != NULL;
982          il = il->next) {
983       il->status = LINK_STATE_UNKNOWN;
984       il->prev_status = LINK_STATE_UNKNOWN;
985       il->sent = 0;
986     }
987
988     start_netlink_thread();
989
990     return -1;
991   } /* if (connectivity_netlink_thread_error != 0) */
992
993   pthread_mutex_unlock(&connectivity_threads_lock);
994
995   return 0;
996 } /* }}} int connectivity_read */
997
998 static int connectivity_shutdown(void) /* {{{ */
999 {
1000   DEBUG("connectivity plugin: Shutting down thread.");
1001
1002   int status = stop_threads();
1003
1004   interface_list_t *il = interface_list_head;
1005   while (il != NULL) {
1006     interface_list_t *il_next;
1007
1008     il_next = il->next;
1009
1010     sfree(il->interface);
1011     sfree(il);
1012
1013     il = il_next;
1014   }
1015
1016   ignorelist_free(ignorelist);
1017
1018   return status;
1019 } /* }}} int connectivity_shutdown */
1020
1021 void module_register(void) {
1022   plugin_register_config("connectivity", connectivity_config, config_keys,
1023                          config_keys_num);
1024   plugin_register_init("connectivity", connectivity_init);
1025   plugin_register_read("connectivity", connectivity_read);
1026   plugin_register_shutdown("connectivity", connectivity_shutdown);
1027 } /* void module_register */