octo code review changes
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33 #include "utils_ignorelist.h"
34
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57
58 #define MYPROTO NETLINK_ROUTE
59
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
90   "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93
94 /*
95  * Private data types
96  */
97
98 struct interface_list_s {
99   char *interface;
100
101   uint32_t status;
102   uint32_t prev_status;
103   uint32_t sent;
104   long long unsigned int timestamp;
105
106   struct interface_list_s *next;
107 };
108 typedef struct interface_list_s interface_list_t;
109
110 /*
111  * Private variables
112  */
113
114 static ignorelist_t *ignorelist = NULL;
115
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
118
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static int connectivity_dequeue_thread_error = 0;
124 static pthread_t connectivity_dequeue_thread_id;
125 static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
127 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
128 // static struct mnl_socket *sock;
129 static int nl_sock = -1;
130 static int event_id = 0;
131 static int unsent_statuses = 0;
132
133 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
134 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
135
136 /*
137  * Prototype
138  */
139
140 static void
141 connectivity_dispatch_notification(const char *interface, const char *type,
142                                    gauge_t value, gauge_t old_value,
143                                    long long unsigned int timestamp);
144
145 /*
146  * Private functions
147  */
148
149 static int gen_message_payload(int state, int old_state, const char *interface,
150                                long long unsigned int timestamp, char **buf) {
151   const unsigned char *buf2;
152   yajl_gen g;
153   char json_str[DATA_MAX_NAME_LEN];
154
155 #if !defined(HAVE_YAJL_V2)
156   yajl_gen_config conf = {};
157
158   conf.beautify = 0;
159 #endif
160
161 #if HAVE_YAJL_V2
162   size_t len;
163   g = yajl_gen_alloc(NULL);
164   yajl_gen_config(g, yajl_gen_beautify, 0);
165 #else
166   unsigned int len;
167   g = yajl_gen_alloc(&conf, NULL);
168 #endif
169
170   yajl_gen_clear(g);
171
172   // *** BEGIN common event header ***
173
174   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
175     goto err;
176
177   // domain
178   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
179                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
180     goto err;
181
182   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
183                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
184     goto err;
185
186   // eventId
187   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
188                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
189       yajl_gen_status_ok)
190     goto err;
191
192   event_id = event_id + 1;
193   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
194   memset(json_str, '\0', DATA_MAX_NAME_LEN);
195   snprintf(json_str, event_id_len, "%d", event_id);
196
197   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
198     goto err;
199   }
200
201   // eventName
202   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
203                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
204       yajl_gen_status_ok)
205     goto err;
206
207   int event_name_len = 0;
208   event_name_len = event_name_len + strlen(interface);    // interface name
209   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
210   event_name_len =
211       event_name_len + 12; // "interface", 2 spaces and null-terminator
212   memset(json_str, '\0', DATA_MAX_NAME_LEN);
213   snprintf(json_str, event_name_len, "interface %s %s", interface,
214            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
215                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
216
217   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
218       yajl_gen_status_ok) {
219     goto err;
220   }
221
222   // lastEpochMicrosec
223   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
224                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
225       yajl_gen_status_ok)
226     goto err;
227
228   int last_epoch_microsec_len =
229       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
230   memset(json_str, '\0', DATA_MAX_NAME_LEN);
231   snprintf(json_str, last_epoch_microsec_len, "%llu",
232            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
233
234   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
235     goto err;
236   }
237
238   // priority
239   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
240                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
241       yajl_gen_status_ok)
242     goto err;
243
244   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
245                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
246       yajl_gen_status_ok)
247     goto err;
248
249   // reportingEntityName
250   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
251                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
252       yajl_gen_status_ok)
253     goto err;
254
255   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
256                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
257       yajl_gen_status_ok)
258     goto err;
259
260   // sequence
261   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
262                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
263       yajl_gen_status_ok)
264     goto err;
265
266   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
267                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
268       yajl_gen_status_ok)
269     goto err;
270
271   // sourceName
272   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
273                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
274       yajl_gen_status_ok)
275     goto err;
276
277   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
278       yajl_gen_status_ok)
279     goto err;
280
281   // startEpochMicrosec
282   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
283                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
284       yajl_gen_status_ok)
285     goto err;
286
287   int start_epoch_microsec_len =
288       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
289   memset(json_str, '\0', DATA_MAX_NAME_LEN);
290   snprintf(json_str, start_epoch_microsec_len, "%llu",
291            (long long unsigned int)timestamp);
292
293   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
294     goto err;
295   }
296
297   // version
298   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
299                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
300     goto err;
301
302   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
303                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
304     goto err;
305
306   // *** END common event header ***
307
308   // *** BEGIN state change fields ***
309
310   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
311                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
312       yajl_gen_status_ok)
313     goto err;
314
315   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
316     goto err;
317
318   // newState
319   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
320                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
321       yajl_gen_status_ok)
322     goto err;
323
324   int new_state_len =
325       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
326                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
327
328   if (yajl_gen_string(
329           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
330                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
331           new_state_len) != yajl_gen_status_ok)
332     goto err;
333
334   // oldState
335   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
336                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
337       yajl_gen_status_ok)
338     goto err;
339
340   int old_state_len =
341       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
342                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
343
344   if (yajl_gen_string(
345           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
346                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
347           old_state_len) != yajl_gen_status_ok)
348     goto err;
349
350   // stateChangeFieldsVersion
351   if (yajl_gen_string(g,
352                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
353                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
354       yajl_gen_status_ok)
355     goto err;
356
357   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
358                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
359       yajl_gen_status_ok)
360     goto err;
361
362   // stateInterface
363   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
364                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
365       yajl_gen_status_ok)
366     goto err;
367
368   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
369       yajl_gen_status_ok)
370     goto err;
371
372   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
373     goto err;
374
375   // *** END state change fields ***
376
377   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
378     goto err;
379
380   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
381     goto err;
382
383   *buf = strdup((char *)buf2);
384
385   if (*buf == NULL) {
386     ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
387           STRERRNO);
388     goto err;
389   }
390
391   yajl_gen_free(g);
392
393   return 0;
394
395 err:
396   yajl_gen_free(g);
397   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
398   return -1;
399 }
400
401 static interface_list_t *add_interface(const char *interface, int status,
402                                        int prev_status) {
403   interface_list_t *il = calloc(1, sizeof(*il));
404
405   if (il == NULL) {
406     ERROR("connectivity plugin: calloc failed during add_interface: %s",
407           STRERRNO);
408     return NULL;
409   }
410
411   char *interface2 = strdup(interface);
412   if (interface2 == NULL) {
413     sfree(il);
414     ERROR("connectivity plugin: strdup failed during add_interface: %s",
415           STRERRNO);
416     return NULL;
417   }
418
419   il->interface = interface2;
420   il->status = status;
421   il->prev_status = prev_status;
422   il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
423   il->sent = 0;
424   il->next = interface_list_head;
425   interface_list_head = il;
426
427   DEBUG("connectivity plugin: added interface %s", interface2);
428
429   return il;
430 }
431
432 static int connectivity_link_state(struct nlmsghdr *msg) {
433   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
434   struct nlattr *attr;
435
436   pthread_mutex_lock(&connectivity_data_lock);
437
438   interface_list_t *il = NULL;
439
440   /* Scan attribute list for device name. */
441   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
442     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
443       continue;
444
445     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
446       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
447             "mnl_attr_validate "
448             "failed.");
449       pthread_mutex_unlock(&connectivity_data_lock);
450       return MNL_CB_ERROR;
451     }
452
453     const char *dev = mnl_attr_get_str(attr);
454
455     // Check the list of interfaces we should monitor, if we've chosen
456     // a subset.  If we don't care about this one, abort.
457     if (ignorelist_match(ignorelist, dev) != 0) {
458       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
459             "interface: %s",
460             dev);
461       break;
462     }
463
464     for (il = interface_list_head; il != NULL; il = il->next)
465       if (strcmp(dev, il->interface) == 0)
466         break;
467
468     uint32_t prev_status;
469
470     if (il == NULL) {
471       // We haven't encountered this interface yet, so add it to the linked list
472       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
473
474       if (il == NULL) {
475         ERROR("connectivity plugin: unable to add interface %s during "
476               "connectivity_link_state",
477               dev);
478         return MNL_CB_ERROR;
479       }
480     }
481
482     prev_status = il->status;
483     il->status =
484         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
485     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
486
487     // If the new status is different than the previous status,
488     // store the previous status and set sent to zero, and set the
489     // global flag to indicate there are statuses to dispatch
490     if (il->status != prev_status) {
491       il->prev_status = prev_status;
492       il->sent = 0;
493       unsent_statuses = 1;
494     }
495
496     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
497           il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
498
499     // no need to loop again, we found the interface name attr
500     // (otherwise the first if-statement in the loop would
501     // have moved us on with 'continue')
502     break;
503   }
504
505   pthread_mutex_unlock(&connectivity_data_lock);
506
507   return 0;
508 }
509
510 static int msg_handler(struct nlmsghdr *msg) {
511   switch (msg->nlmsg_type) {
512   case RTM_NEWADDR:
513   case RTM_DELADDR:
514   case RTM_NEWROUTE:
515   case RTM_DELROUTE:
516   case RTM_DELLINK:
517     // Not of interest in current version
518     break;
519   case RTM_NEWLINK:
520     connectivity_link_state(msg);
521     break;
522   default:
523     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d",
524           msg->nlmsg_type);
525     break;
526   }
527   return 0;
528 }
529
530 static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
531   int ret = 0;
532   int recv_flags = MSG_DONTWAIT;
533
534   if (nl == -1)
535     return ret;
536
537   while (42) {
538     char buf[4096];
539
540     pthread_mutex_lock(&connectivity_threads_lock);
541
542     if (connectivity_netlink_thread_loop <= 0) {
543       pthread_mutex_unlock(&connectivity_threads_lock);
544       return ret;
545     }
546
547     pthread_mutex_unlock(&connectivity_threads_lock);
548
549     int status = recv(nl, buf, sizeof(buf), recv_flags);
550
551     if (status < 0) {
552
553       // If there were no more messages to drain from the socket,
554       // then signal the dequeue thread and allow it to dispatch
555       // any saved interface status changes.  Then continue, but
556       // block and wait for new messages
557       if (errno == EWOULDBLOCK || errno == EAGAIN) {
558         pthread_cond_signal(&connectivity_cond);
559
560         recv_flags = 0;
561         continue;
562       }
563
564       /* Anything else is an error */
565       ERROR("connectivity plugin: read_event: Error recv: %d", status);
566       return status;
567     }
568
569     // Message received successfully, so we'll stop blocking on the
570     // receive call for now (until we get a "would block" error, which
571     // will be handled above)
572     recv_flags = MSG_DONTWAIT;
573
574     if (status == 0) {
575       DEBUG("connectivity plugin: read_event: EOF");
576     }
577
578     /* We need to handle more than one message per 'recvmsg' */
579     for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
580          NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
581       /* Finish reading */
582       if (h->nlmsg_type == NLMSG_DONE)
583         return ret;
584
585       /* Message is some kind of error */
586       if (h->nlmsg_type == NLMSG_ERROR) {
587         ERROR("connectivity plugin: read_event: Message is an error");
588         return -1; // Error
589       }
590
591       /* Call message handler */
592       if (msg_handler) {
593         ret = (*msg_handler)(h);
594         if (ret < 0) {
595           ERROR("connectivity plugin: read_event: Message handler error %d",
596                 ret);
597           return ret;
598         }
599       } else {
600         ERROR("connectivity plugin: read_event: Error NULL message handler");
601         return -1;
602       }
603     }
604   }
605
606   return ret;
607 }
608
609 // NOTE: Caller MUST hold connectivity_data_lock when calling this function
610 static void send_interface_status() {
611   for (interface_list_t *il = interface_list_head; il != NULL;
612        il = il->next) /* {{{ */
613   {
614     uint32_t status;
615     uint32_t prev_status;
616     uint32_t sent;
617
618     status = il->status;
619     prev_status = il->prev_status;
620     sent = il->sent;
621
622     if (status != prev_status && sent == 0) {
623       connectivity_dispatch_notification(il->interface, "gauge", status,
624                                          prev_status, il->timestamp);
625       il->sent = 1;
626     }
627   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
628
629   unsent_statuses = 0;
630 }
631
632 static int read_interface_status() /* {{{ */
633 {
634   pthread_mutex_lock(&connectivity_data_lock);
635
636   if (!unsent_statuses)
637     pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
638
639   send_interface_status();
640
641   pthread_mutex_unlock(&connectivity_data_lock);
642
643   return 0;
644 } /* }}} int *read_interface_status */
645
646 static void *connectivity_netlink_thread(void *arg) /* {{{ */
647 {
648   pthread_mutex_lock(&connectivity_threads_lock);
649
650   while (connectivity_netlink_thread_loop > 0) {
651     pthread_mutex_unlock(&connectivity_threads_lock);
652
653     int status = read_event(nl_sock, msg_handler);
654
655     pthread_mutex_lock(&connectivity_threads_lock);
656
657     if (status < 0) {
658       connectivity_netlink_thread_error = 1;
659       break;
660     }
661   } /* while (connectivity_netlink_thread_loop > 0) */
662
663   pthread_mutex_unlock(&connectivity_threads_lock);
664
665   return ((void *)0);
666 } /* }}} void *connectivity_netlink_thread */
667
668 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
669 {
670   pthread_mutex_lock(&connectivity_threads_lock);
671
672   while (connectivity_dequeue_thread_loop > 0) {
673     pthread_mutex_unlock(&connectivity_threads_lock);
674
675     int status = read_interface_status();
676
677     pthread_mutex_lock(&connectivity_threads_lock);
678
679     if (status < 0) {
680       connectivity_dequeue_thread_error = 1;
681       break;
682     }
683   } /* while (connectivity_dequeue_thread_loop > 0) */
684
685   pthread_mutex_unlock(&connectivity_threads_lock);
686
687   return ((void *)0);
688 } /* }}} void *connectivity_dequeue_thread */
689
690 static int nl_connect() {
691   struct sockaddr_nl sa_nl = {
692       .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
693   };
694
695   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
696   if (nl_sock == -1) {
697     ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
698     return -1;
699   }
700
701   int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
702   if (rc == -1) {
703     ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
704     close(nl_sock);
705     return -1;
706   }
707
708   return 0;
709 }
710
711 static int start_netlink_thread(void) /* {{{ */
712 {
713   int status;
714
715   pthread_mutex_lock(&connectivity_threads_lock);
716
717   if (connectivity_netlink_thread_loop != 0) {
718     pthread_mutex_unlock(&connectivity_threads_lock);
719     return (0);
720   }
721
722   connectivity_netlink_thread_loop = 1;
723   connectivity_netlink_thread_error = 0;
724
725   if (nl_sock == -1) {
726     status = nl_connect();
727
728     if (status != 0) {
729       pthread_mutex_unlock(&connectivity_threads_lock);
730       return status;
731     }
732   }
733
734   status = plugin_thread_create(&connectivity_netlink_thread_id,
735                                 /* attr = */ NULL, connectivity_netlink_thread,
736                                 /* arg = */ (void *)0, "connectivity");
737   if (status != 0) {
738     connectivity_netlink_thread_loop = 0;
739     ERROR("connectivity plugin: Starting thread failed.");
740     pthread_mutex_unlock(&connectivity_threads_lock);
741
742     int status2 = close(nl_sock);
743
744     if (status2 != 0) {
745       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
746             status2, STRERRNO);
747     } else
748       nl_sock = -1;
749
750     return (-1);
751   }
752
753   pthread_mutex_unlock(&connectivity_threads_lock);
754
755   return status;
756 }
757
758 static int start_dequeue_thread(void) /* {{{ */
759 {
760   pthread_mutex_lock(&connectivity_threads_lock);
761
762   if (connectivity_dequeue_thread_loop != 0) {
763     pthread_mutex_unlock(&connectivity_threads_lock);
764     return (0);
765   }
766
767   connectivity_dequeue_thread_loop = 1;
768   connectivity_dequeue_thread_error = 0;
769
770   int status =
771       plugin_thread_create(&connectivity_dequeue_thread_id,
772                            /* attr = */ NULL, connectivity_dequeue_thread,
773                            /* arg = */ (void *)0, "connectivity");
774   if (status != 0) {
775     connectivity_dequeue_thread_loop = 0;
776     ERROR("connectivity plugin: Starting dequeue thread failed.");
777     pthread_mutex_unlock(&connectivity_threads_lock);
778     return (-1);
779   }
780
781   pthread_mutex_unlock(&connectivity_threads_lock);
782
783   return status;
784 } /* }}} int start_dequeue_thread */
785
786 static int start_threads(void) /* {{{ */
787 {
788   int status = start_netlink_thread();
789   int status2 = start_dequeue_thread();
790
791   if (status != 0)
792     return status;
793   else
794     return status2;
795 } /* }}} int start_threads */
796
797 static int stop_netlink_thread(int shutdown) /* {{{ */
798 {
799   int socket_status, thread_stratus;
800
801   if (nl_sock != -1) {
802     socket_status = close(nl_sock);
803     if (socket_status != 0) {
804       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
805             socket_status, STRERRNO);
806     } else
807       nl_sock = -1;
808   } else
809     socket_status = 0;
810
811   pthread_mutex_lock(&connectivity_threads_lock);
812
813   if (connectivity_netlink_thread_loop == 0) {
814     pthread_mutex_unlock(&connectivity_threads_lock);
815     // Thread has already been terminated, nothing more to attempt
816     return socket_status;
817   }
818
819   // Set thread termination status
820   connectivity_netlink_thread_loop = 0;
821   pthread_mutex_unlock(&connectivity_threads_lock);
822
823   // Let threads waiting on access to the interface list know to move
824   // on such that they'll see the threads termination status
825   pthread_cond_broadcast(&connectivity_cond);
826
827   if (shutdown == 1) {
828     // Since the thread is blocking, calling pthread_join
829     // doesn't actually succeed in stopping it.  It will stick around
830     // until a NETLINK message is received on the socket (at which
831     // it will realize that "connectivity_netlink_thread_loop" is 0 and will
832     // break out of the read loop and be allowed to die).  This is
833     // fine when the process isn't supposed to be exiting, but in
834     // the case of a process shutdown, we don't want to have an
835     // idle thread hanging around.  Calling pthread_cancel here in
836     // the case of a shutdown is just assures that the thread is
837     // gone and that the process has been fully terminated.
838
839     DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
840
841     thread_stratus = pthread_cancel(connectivity_netlink_thread_id);
842
843     if (thread_stratus != 0 && thread_stratus != ESRCH) {
844       ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
845             thread_stratus);
846       thread_stratus = -1;
847     } else
848       thread_stratus = 0;
849   } else {
850     thread_stratus =
851         pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
852     if (thread_stratus != 0 && thread_stratus != ESRCH) {
853       ERROR("connectivity plugin: Stopping netlink thread failed: %d",
854             thread_stratus);
855       thread_stratus = -1;
856     } else
857       thread_stratus = 0;
858   }
859
860   pthread_mutex_lock(&connectivity_threads_lock);
861   memset(&connectivity_netlink_thread_id, 0,
862          sizeof(connectivity_netlink_thread_id));
863   connectivity_netlink_thread_error = 0;
864   pthread_mutex_unlock(&connectivity_threads_lock);
865
866   DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
867
868   if (socket_status != 0)
869     return socket_status;
870   else
871     return thread_stratus;
872 }
873
874 static int stop_dequeue_thread(int shutdown) /* {{{ */
875 {
876   int status;
877
878   pthread_mutex_lock(&connectivity_threads_lock);
879
880   if (connectivity_dequeue_thread_loop == 0) {
881     pthread_mutex_unlock(&connectivity_threads_lock);
882     return (-1);
883   }
884
885   // Set thread termination status
886   connectivity_dequeue_thread_loop = 0;
887   pthread_mutex_unlock(&connectivity_threads_lock);
888
889   // Let threads waiting on access to the interface list know to move
890   // on such that they'll see the threads termination status
891   pthread_cond_broadcast(&connectivity_cond);
892
893   if (shutdown == 1) {
894     // Calling pthread_cancel here in
895     // the case of a shutdown just assures that the thread is
896     // gone and that the process has been fully terminated.
897
898     DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
899
900     status = pthread_cancel(connectivity_dequeue_thread_id);
901
902     if (status != 0 && status != ESRCH) {
903       ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
904       status = -1;
905     } else
906       status = 0;
907   } else {
908     status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
909     if (status != 0 && status != ESRCH) {
910       ERROR("connectivity plugin: Stopping dequeue thread failed.");
911       status = -1;
912     } else
913       status = 0;
914   }
915
916   pthread_mutex_lock(&connectivity_threads_lock);
917   memset(&connectivity_dequeue_thread_id, 0,
918          sizeof(connectivity_dequeue_thread_id));
919   connectivity_dequeue_thread_error = 0;
920   pthread_mutex_unlock(&connectivity_threads_lock);
921
922   DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
923
924   return (status);
925 } /* }}} int stop_dequeue_thread */
926
927 static int stop_threads(int shutdown) /* {{{ */
928 {
929   int status = stop_netlink_thread(shutdown);
930   int status2 = stop_dequeue_thread(shutdown);
931
932   if (status != 0)
933     return status;
934   else
935     return status2;
936 } /* }}} int stop_threads */
937
938 static int connectivity_init(void) /* {{{ */
939 {
940   if (monitor_all_interfaces) {
941     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
942            "be monitored");
943   }
944
945   return (start_threads());
946 } /* }}} int connectivity_init */
947
948 static int connectivity_config(const char *key, const char *value) /* {{{ */
949 {
950   if (ignorelist == NULL) {
951     ignorelist = ignorelist_create(/* invert = */ 1);
952   }
953
954   if (strcasecmp(key, "Interface") == 0) {
955     ignorelist_add(ignorelist, value);
956     monitor_all_interfaces = 0;
957   } else if (strcasecmp(key, "IgnoreSelected") == 0) {
958     int invert = 1;
959     if (IS_TRUE(value))
960       invert = 0;
961     ignorelist_set_invert(ignorelist, invert);
962   } else {
963     return (-1);
964   }
965
966   return (0);
967 } /* }}} int connectivity_config */
968
969 static void
970 connectivity_dispatch_notification(const char *interface, const char *type,
971                                    gauge_t value, gauge_t old_value,
972                                    long long unsigned int timestamp) {
973   char *buf = NULL;
974   notification_t n = {
975       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
976
977   if (value == LINK_STATE_UP)
978     n.severity = NOTIF_OKAY;
979
980   sstrncpy(n.host, hostname_g, sizeof(n.host));
981   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
982   sstrncpy(n.type, "gauge", sizeof(n.type));
983   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
984
985   gen_message_payload(value, old_value, interface, timestamp, &buf);
986
987   notification_meta_t *m = calloc(1, sizeof(*m));
988
989   if (m == NULL) {
990     sfree(buf);
991     ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO);
992     return;
993   }
994
995   sstrncpy(m->name, "ves", sizeof(m->name));
996   m->nm_value.nm_string = sstrdup(buf);
997   m->type = NM_TYPE_STRING;
998   n.meta = m;
999
1000   DEBUG("connectivity plugin: notification message: %s",
1001         n.meta->nm_value.nm_string);
1002
1003   DEBUG("connectivity plugin: dispatching state %d for interface %s",
1004         (int)value, interface);
1005
1006   plugin_dispatch_notification(&n);
1007   plugin_notification_meta_free(n.meta);
1008
1009   // strdup'd in gen_message_payload
1010   if (buf != NULL)
1011     sfree(buf);
1012 }
1013
1014 static int connectivity_read(void) /* {{{ */
1015 {
1016   pthread_mutex_lock(&connectivity_threads_lock);
1017
1018   if (connectivity_netlink_thread_error != 0) {
1019
1020     pthread_mutex_unlock(&connectivity_threads_lock);
1021
1022     ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
1023           "it.");
1024
1025     stop_netlink_thread(0);
1026
1027     for (interface_list_t *il = interface_list_head; il != NULL;
1028          il = il->next) {
1029       il->status = LINK_STATE_UNKNOWN;
1030       il->prev_status = LINK_STATE_UNKNOWN;
1031       il->sent = 0;
1032     }
1033
1034     start_netlink_thread();
1035
1036     return (-1);
1037   } /* if (connectivity_netlink_thread_error != 0) */
1038
1039   if (connectivity_dequeue_thread_error != 0) {
1040
1041     pthread_mutex_unlock(&connectivity_threads_lock);
1042
1043     ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
1044           "it.");
1045
1046     stop_dequeue_thread(0);
1047
1048     start_dequeue_thread();
1049
1050     return (-1);
1051   } /* if (connectivity_dequeue_thread_error != 0) */
1052
1053   pthread_mutex_unlock(&connectivity_threads_lock);
1054
1055   return (0);
1056 } /* }}} int connectivity_read */
1057
1058 static int connectivity_shutdown(void) /* {{{ */
1059 {
1060   DEBUG("connectivity plugin: Shutting down thread.");
1061   if (stop_threads(1) < 0)
1062     return (-1);
1063
1064   interface_list_t *il = interface_list_head;
1065   while (il != NULL) {
1066     interface_list_t *il_next;
1067
1068     il_next = il->next;
1069
1070     sfree(il->interface);
1071     sfree(il);
1072
1073     il = il_next;
1074   }
1075
1076   ignorelist_free(ignorelist);
1077
1078   return (0);
1079 } /* }}} int connectivity_shutdown */
1080
1081 void module_register(void) {
1082   plugin_register_config("connectivity", connectivity_config, config_keys,
1083                          config_keys_num);
1084   plugin_register_init("connectivity", connectivity_init);
1085   plugin_register_read("connectivity", connectivity_read);
1086   plugin_register_shutdown("connectivity", connectivity_shutdown);
1087 } /* void module_register */