Use a separate dequeue thread to dispatch notifications
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33 #include "utils_ignorelist.h"
34
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57
58 #define MYPROTO NETLINK_ROUTE
59
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
90   "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93
94 /*
95  * Private data types
96  */
97
98 struct interface_list_s {
99   char *interface;
100
101   uint32_t status;
102   uint32_t prev_status;
103   uint32_t sent;
104   long long unsigned int timestamp;
105
106   struct interface_list_s *next;
107 };
108 typedef struct interface_list_s interface_list_t;
109
110 /*
111  * Private variables
112  */
113
114 static ignorelist_t *ignorelist = NULL;
115
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
118
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static int connectivity_dequeue_thread_error = 0;
124 static pthread_t connectivity_dequeue_thread_id;
125 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
127 // static struct mnl_socket *sock;
128 static int nl_sock = -1;
129 static int event_id = 0;
130
131 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
132 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
133
134 /*
135  * Prototype
136  */
137
138 static void
139 connectivity_dispatch_notification(const char *interface, const char *type,
140                                    gauge_t value, gauge_t old_value,
141                                    long long unsigned int timestamp);
142
143 /*
144  * Private functions
145  */
146
147 static int gen_message_payload(int state, int old_state, const char *interface,
148                                long long unsigned int timestamp, char **buf) {
149   const unsigned char *buf2;
150   yajl_gen g;
151   char json_str[DATA_MAX_NAME_LEN];
152
153 #if !defined(HAVE_YAJL_V2)
154   yajl_gen_config conf = {};
155
156   conf.beautify = 0;
157 #endif
158
159 #if HAVE_YAJL_V2
160   size_t len;
161   g = yajl_gen_alloc(NULL);
162   yajl_gen_config(g, yajl_gen_beautify, 0);
163 #else
164   unsigned int len;
165   g = yajl_gen_alloc(&conf, NULL);
166 #endif
167
168   yajl_gen_clear(g);
169
170   // *** BEGIN common event header ***
171
172   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
173     goto err;
174
175   // domain
176   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
177                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
178     goto err;
179
180   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
181                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
182     goto err;
183
184   // eventId
185   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
186                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
187       yajl_gen_status_ok)
188     goto err;
189
190   event_id = event_id + 1;
191   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
192   memset(json_str, '\0', DATA_MAX_NAME_LEN);
193   snprintf(json_str, event_id_len, "%d", event_id);
194
195   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
196     goto err;
197   }
198
199   // eventName
200   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
201                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
202       yajl_gen_status_ok)
203     goto err;
204
205   int event_name_len = 0;
206   event_name_len = event_name_len + strlen(interface);    // interface name
207   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
208   event_name_len =
209       event_name_len + 12; // "interface", 2 spaces and null-terminator
210   memset(json_str, '\0', DATA_MAX_NAME_LEN);
211   snprintf(json_str, event_name_len, "interface %s %s", interface,
212            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
213                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
214
215   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
216       yajl_gen_status_ok) {
217     goto err;
218   }
219
220   // lastEpochMicrosec
221   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
222                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
223       yajl_gen_status_ok)
224     goto err;
225
226   int last_epoch_microsec_len =
227       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
228   memset(json_str, '\0', DATA_MAX_NAME_LEN);
229   snprintf(json_str, last_epoch_microsec_len, "%llu",
230            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
231
232   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
233     goto err;
234   }
235
236   // priority
237   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
238                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
239       yajl_gen_status_ok)
240     goto err;
241
242   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
243                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
244       yajl_gen_status_ok)
245     goto err;
246
247   // reportingEntityName
248   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
249                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
250       yajl_gen_status_ok)
251     goto err;
252
253   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
254                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
255       yajl_gen_status_ok)
256     goto err;
257
258   // sequence
259   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
260                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
261       yajl_gen_status_ok)
262     goto err;
263
264   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
265                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
266       yajl_gen_status_ok)
267     goto err;
268
269   // sourceName
270   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
271                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
272       yajl_gen_status_ok)
273     goto err;
274
275   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
276       yajl_gen_status_ok)
277     goto err;
278
279   // startEpochMicrosec
280   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
281                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
282       yajl_gen_status_ok)
283     goto err;
284
285   int start_epoch_microsec_len =
286       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
287   memset(json_str, '\0', DATA_MAX_NAME_LEN);
288   snprintf(json_str, start_epoch_microsec_len, "%llu",
289            (long long unsigned int)timestamp);
290
291   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
292     goto err;
293   }
294
295   // version
296   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
297                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
298     goto err;
299
300   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
301                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
302     goto err;
303
304   // *** END common event header ***
305
306   // *** BEGIN state change fields ***
307
308   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
309                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
310       yajl_gen_status_ok)
311     goto err;
312
313   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
314     goto err;
315
316   // newState
317   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
318                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
319       yajl_gen_status_ok)
320     goto err;
321
322   int new_state_len =
323       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
324                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
325
326   if (yajl_gen_string(
327           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
328                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
329           new_state_len) != yajl_gen_status_ok)
330     goto err;
331
332   // oldState
333   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
334                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
335       yajl_gen_status_ok)
336     goto err;
337
338   int old_state_len =
339       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
340                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
341
342   if (yajl_gen_string(
343           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
344                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
345           old_state_len) != yajl_gen_status_ok)
346     goto err;
347
348   // stateChangeFieldsVersion
349   if (yajl_gen_string(g,
350                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
351                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
352       yajl_gen_status_ok)
353     goto err;
354
355   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
356                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
357       yajl_gen_status_ok)
358     goto err;
359
360   // stateInterface
361   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
362                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
363       yajl_gen_status_ok)
364     goto err;
365
366   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
367       yajl_gen_status_ok)
368     goto err;
369
370   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
371     goto err;
372
373   // *** END state change fields ***
374
375   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
376     goto err;
377
378   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
379     goto err;
380
381   *buf = malloc(strlen((char *)buf2) + 1);
382
383   if (*buf == NULL) {
384     char errbuf[1024];
385     ERROR("connectivity plugin: malloc failed during gen_message_payload: %s",
386           sstrerror(errno, errbuf, sizeof(errbuf)));
387     goto err;
388   }
389
390   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
391
392   yajl_gen_free(g);
393
394   return 0;
395
396 err:
397   yajl_gen_free(g);
398   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
399   return -1;
400 }
401
402 static interface_list_t *add_interface(const char *interface, int status,
403                                        int prev_status) {
404   interface_list_t *il;
405   char *interface2;
406
407   il = malloc(sizeof(*il));
408   if (il == NULL) {
409     char errbuf[1024];
410     ERROR("connectivity plugin: malloc failed during add_interface: %s",
411           sstrerror(errno, errbuf, sizeof(errbuf)));
412     return NULL;
413   }
414
415   interface2 = strdup(interface);
416   if (interface2 == NULL) {
417     char errbuf[1024];
418     sfree(il);
419     ERROR("connectivity plugin: strdup failed during add_interface: %s",
420           sstrerror(errno, errbuf, sizeof(errbuf)));
421     return NULL;
422   }
423
424   il->interface = interface2;
425   il->status = status;
426   il->prev_status = prev_status;
427   il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
428   il->sent = 0;
429   il->next = interface_list_head;
430   interface_list_head = il;
431
432   DEBUG("connectivity plugin: added interface %s", interface2);
433
434   return il;
435 }
436
437 static int connectivity_link_state(struct nlmsghdr *msg) {
438   int retval = 0;
439   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
440   struct nlattr *attr;
441   const char *dev = NULL;
442
443   pthread_mutex_lock(&connectivity_lock);
444
445   interface_list_t *il = NULL;
446
447   /* Scan attribute list for device name. */
448   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
449     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
450       continue;
451
452     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
453       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
454             "mnl_attr_validate "
455             "failed.");
456       pthread_mutex_unlock(&connectivity_lock);
457       return MNL_CB_ERROR;
458     }
459
460     dev = mnl_attr_get_str(attr);
461
462     // Check the list of interfaces we should monitor, if we've chosen
463     // a subset.  If we don't care about this one, abort.
464     if (ignorelist_match(ignorelist, dev) != 0) {
465       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
466             "interface: %s",
467             dev);
468       break;
469     }
470
471     for (il = interface_list_head; il != NULL; il = il->next)
472       if (strcmp(dev, il->interface) == 0)
473         break;
474
475     uint32_t prev_status;
476
477     if (il == NULL) {
478       // We haven't encountered this interface yet, so add it to the linked list
479       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
480
481       if (il == NULL) {
482         ERROR("connectivity plugin: unable to add interface %s during "
483               "connectivity_link_state",
484               dev);
485         return MNL_CB_ERROR;
486       }
487     }
488
489     prev_status = il->status;
490     il->status =
491         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
492     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
493
494     // If the new status is different than the previous status,
495     // store the previous status and set sent to zero
496     if (il->status != prev_status) {
497       il->prev_status = prev_status;
498       il->sent = 0;
499     }
500
501     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
502           il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
503
504     // no need to loop again, we found the interface name attr
505     // (otherwise the first if-statement in the loop would
506     // have moved us on with 'continue')
507     break;
508   }
509
510   pthread_mutex_unlock(&connectivity_lock);
511
512   return retval;
513 }
514
515 static int msg_handler(struct nlmsghdr *msg) {
516   switch (msg->nlmsg_type) {
517   case RTM_NEWADDR:
518   case RTM_DELADDR:
519   case RTM_NEWROUTE:
520   case RTM_DELROUTE:
521   case RTM_DELLINK:
522     // Not of interest in current version
523     break;
524   case RTM_NEWLINK:
525     connectivity_link_state(msg);
526     break;
527   default:
528     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
529           msg->nlmsg_type);
530     break;
531   }
532   return 0;
533 }
534
535 // static int read_event(struct mnl_socket *nl,
536 //                       int (*msg_handler)(struct nlmsghdr *)) {
537 static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
538   int status;
539   int ret = 0;
540   char buf[4096];
541   struct nlmsghdr *h;
542   int recv_flags = MSG_DONTWAIT;
543
544   // if (nl == NULL)
545   //   return ret;
546
547   if (nl == -1)
548     return ret;
549
550   while (42) {
551     pthread_mutex_lock(&connectivity_lock);
552
553     if (connectivity_netlink_thread_loop <= 0) {
554       pthread_mutex_unlock(&connectivity_lock);
555       return ret;
556     }
557
558     pthread_mutex_unlock(&connectivity_lock);
559
560     status = recv(nl, buf, sizeof(buf), recv_flags);
561
562     if (status < 0) {
563
564       // If there were no more messages to drain from the socket,
565       // then signal the dequeue thread and allow it to dispatch
566       // any saved interface status changes.  Then continue, but
567       // block and wait for new messages
568       if (errno == EWOULDBLOCK || errno == EAGAIN) {
569         pthread_mutex_lock(&connectivity_lock);
570         pthread_cond_signal(&connectivity_cond);
571         pthread_mutex_unlock(&connectivity_lock);
572
573         recv_flags = 0;
574         continue;
575       }
576
577       /* Anything else is an error */
578       // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom:
579       // %d\n",
580       //       status);
581       ERROR("connectivity plugin: read_event: Error recv: %d\n", status);
582       return status;
583     }
584
585     // Message received successfully, so we'll stop blocking on the
586     // receive call for now (until we get a "would block" error, which
587     // will be handled above)
588     recv_flags = MSG_DONTWAIT;
589
590     if (status == 0) {
591       DEBUG("connectivity plugin: read_event: EOF\n");
592     }
593
594     /* We need to handle more than one message per 'recvmsg' */
595     for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
596          h = NLMSG_NEXT(h, status)) {
597       /* Finish reading */
598       if (h->nlmsg_type == NLMSG_DONE)
599         return ret;
600
601       /* Message is some kind of error */
602       if (h->nlmsg_type == NLMSG_ERROR) {
603         ERROR("connectivity plugin: read_event: Message is an error\n");
604         return -1; // Error
605       }
606
607       /* Call message handler */
608       if (msg_handler) {
609         ret = (*msg_handler)(h);
610         if (ret < 0) {
611           ERROR("connectivity plugin: read_event: Message handler error %d\n",
612                 ret);
613           return ret;
614         }
615       } else {
616         ERROR("connectivity plugin: read_event: Error NULL message handler\n");
617         return -1;
618       }
619     }
620   }
621
622   return ret;
623 }
624
625 static void send_interface_status() {
626   for (interface_list_t *il = interface_list_head; il != NULL;
627        il = il->next) /* {{{ */
628   {
629     uint32_t status;
630     uint32_t prev_status;
631     uint32_t sent;
632
633     status = il->status;
634     prev_status = il->prev_status;
635     sent = il->sent;
636
637     if (status != prev_status && sent == 0) {
638       connectivity_dispatch_notification(il->interface, "gauge", status,
639                                          prev_status, il->timestamp);
640       il->sent = 1;
641     }
642   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
643 }
644
645 static int read_interface_status() /* {{{ */
646 {
647   pthread_mutex_lock(&connectivity_lock);
648
649   // This first attempt is necessary because the netlink thread
650   // might have held the lock while this thread was blocked on
651   // the lock acquisition just above.  And while the netlink thread
652   // had the lock, it could have called pthread_cond_singal, which
653   // obviously wouldn't have woken this thread, since this thread
654   // was not yet waiting on the condition signal.  So we need to
655   // loop through the interfaces and check if any have changed
656   // status before we wait on the condition signal
657   send_interface_status();
658
659   pthread_cond_wait(&connectivity_cond, &connectivity_lock);
660
661   send_interface_status();
662
663   pthread_mutex_unlock(&connectivity_lock);
664
665   return 0;
666 } /* }}} int *read_interface_status */
667
668 static void *connectivity_netlink_thread(void *arg) /* {{{ */
669 {
670   pthread_mutex_lock(&connectivity_lock);
671
672   while (connectivity_netlink_thread_loop > 0) {
673     int status;
674
675     pthread_mutex_unlock(&connectivity_lock);
676
677     status = read_event(nl_sock, msg_handler);
678
679     pthread_mutex_lock(&connectivity_lock);
680
681     if (status < 0) {
682       connectivity_netlink_thread_error = 1;
683       break;
684     }
685
686     if (connectivity_netlink_thread_loop <= 0)
687       break;
688   } /* while (connectivity_netlink_thread_loop > 0) */
689
690   pthread_mutex_unlock(&connectivity_lock);
691
692   return ((void *)0);
693 } /* }}} void *connectivity_netlink_thread */
694
695 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
696 {
697   pthread_mutex_lock(&connectivity_lock);
698
699   while (connectivity_dequeue_thread_loop > 0) {
700     int status;
701
702     pthread_mutex_unlock(&connectivity_lock);
703
704     status = read_interface_status();
705
706     pthread_mutex_lock(&connectivity_lock);
707
708     if (status < 0) {
709       connectivity_dequeue_thread_error = 1;
710       break;
711     }
712
713     if (connectivity_dequeue_thread_loop <= 0)
714       break;
715   } /* while (connectivity_dequeue_thread_loop > 0) */
716
717   pthread_mutex_unlock(&connectivity_lock);
718
719   return ((void *)0);
720 } /* }}} void *connectivity_dequeue_thread */
721
722 static int nl_connect() {
723   int rc;
724   struct sockaddr_nl sa_nl;
725
726   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
727   if (nl_sock == -1) {
728     ERROR("connectivity plugin: socket open failed: %d", errno);
729     return -1;
730   }
731
732   sa_nl.nl_family = AF_NETLINK;
733   sa_nl.nl_groups = RTMGRP_LINK;
734   sa_nl.nl_pid = getpid();
735
736   rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
737   if (rc == -1) {
738     ERROR("connectivity plugin: socket bind failed: %d", errno);
739     close(nl_sock);
740     return -1;
741   }
742
743   return 0;
744 }
745
746 static int start_netlink_thread(void) /* {{{ */
747 {
748   int status;
749
750   pthread_mutex_lock(&connectivity_lock);
751
752   if (connectivity_netlink_thread_loop != 0) {
753     pthread_mutex_unlock(&connectivity_lock);
754     return (0);
755   }
756
757   connectivity_netlink_thread_loop = 1;
758   connectivity_netlink_thread_error = 0;
759
760   if (nl_sock == -1) {
761     status = nl_connect();
762
763     if (status != 0)
764       return status;
765   }
766
767   status = plugin_thread_create(&connectivity_netlink_thread_id,
768                                 /* attr = */ NULL, connectivity_netlink_thread,
769                                 /* arg = */ (void *)0, "connectivity");
770   if (status != 0) {
771     connectivity_netlink_thread_loop = 0;
772     ERROR("connectivity plugin: Starting thread failed.");
773     pthread_mutex_unlock(&connectivity_lock);
774
775     int status2 = close(nl_sock);
776
777     if (status2 != 0) {
778       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
779             status2, strerror(errno));
780     } else
781       nl_sock = -1;
782
783     return (-1);
784   }
785
786   pthread_mutex_unlock(&connectivity_lock);
787
788   return status;
789 }
790
791 static int start_dequeue_thread(void) /* {{{ */
792 {
793   int status;
794
795   pthread_mutex_lock(&connectivity_lock);
796
797   if (connectivity_dequeue_thread_loop != 0) {
798     pthread_mutex_unlock(&connectivity_lock);
799     return (0);
800   }
801
802   connectivity_dequeue_thread_loop = 1;
803   connectivity_dequeue_thread_error = 0;
804
805   status = plugin_thread_create(&connectivity_dequeue_thread_id,
806                                 /* attr = */ NULL, connectivity_dequeue_thread,
807                                 /* arg = */ (void *)0, "connectivity");
808   if (status != 0) {
809     connectivity_dequeue_thread_loop = 0;
810     ERROR("connectivity plugin: Starting dequeue thread failed.");
811     pthread_mutex_unlock(&connectivity_lock);
812     return (-1);
813   }
814
815   pthread_mutex_unlock(&connectivity_lock);
816
817   return status;
818 } /* }}} int start_dequeue_thread */
819
820 static int start_threads(void) /* {{{ */
821 {
822   int status, status2;
823
824   status = start_netlink_thread();
825   status2 = start_dequeue_thread();
826
827   if (status < 0)
828     return status;
829   else
830     return status2;
831 } /* }}} int start_threads */
832
833 static int stop_netlink_thread(int shutdown) /* {{{ */
834 {
835   int status;
836
837   if (nl_sock != -1) {
838     status = close(nl_sock);
839     if (status != 0) {
840       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
841             status, strerror(errno));
842       return (-1);
843     } else
844       nl_sock = -1;
845   }
846
847   pthread_mutex_lock(&connectivity_lock);
848
849   if (connectivity_netlink_thread_loop == 0) {
850     pthread_mutex_unlock(&connectivity_lock);
851     return (-1);
852   }
853
854   connectivity_netlink_thread_loop = 0;
855   pthread_cond_broadcast(&connectivity_cond);
856   pthread_mutex_unlock(&connectivity_lock);
857
858   if (shutdown == 1) {
859     // Since the thread is blocking, calling pthread_join
860     // doesn't actually succeed in stopping it.  It will stick around
861     // until a NETLINK message is received on the socket (at which
862     // it will realize that "connectivity_netlink_thread_loop" is 0 and will
863     // break out of the read loop and be allowed to die).  This is
864     // fine when the process isn't supposed to be exiting, but in
865     // the case of a process shutdown, we don't want to have an
866     // idle thread hanging around.  Calling pthread_cancel here in
867     // the case of a shutdown is just assures that the thread is
868     // gone and that the process has been fully terminated.
869
870     DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
871
872     status = pthread_cancel(connectivity_netlink_thread_id);
873
874     if (status != 0 && status != ESRCH) {
875       ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status);
876       status = -1;
877     } else
878       status = 0;
879   } else {
880     status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
881     if (status != 0 && status != ESRCH) {
882       ERROR("connectivity plugin: Stopping netlink thread failed.");
883       status = -1;
884     } else
885       return 0;
886   }
887
888   pthread_mutex_lock(&connectivity_lock);
889   memset(&connectivity_netlink_thread_id, 0,
890          sizeof(connectivity_netlink_thread_id));
891   connectivity_netlink_thread_error = 0;
892   pthread_mutex_unlock(&connectivity_lock);
893
894   DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
895
896   return status;
897 }
898
899 static int stop_dequeue_thread(int shutdown) /* {{{ */
900 {
901   int status;
902
903   pthread_mutex_lock(&connectivity_lock);
904
905   if (connectivity_dequeue_thread_loop == 0) {
906     pthread_mutex_unlock(&connectivity_lock);
907     return (-1);
908   }
909
910   connectivity_dequeue_thread_loop = 0;
911   pthread_cond_broadcast(&connectivity_cond);
912   pthread_mutex_unlock(&connectivity_lock);
913
914   if (shutdown == 1) {
915     // Calling pthread_cancel here in
916     // the case of a shutdown just assures that the thread is
917     // gone and that the process has been fully terminated.
918
919     DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
920
921     status = pthread_cancel(connectivity_dequeue_thread_id);
922
923     if (status != 0 && status != ESRCH) {
924       ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
925       status = -1;
926     } else
927       status = 0;
928   } else {
929     status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
930     if (status != 0 && status != ESRCH) {
931       ERROR("connectivity plugin: Stopping dequeue thread failed.");
932       status = -1;
933     } else
934       status = 0;
935   }
936
937   pthread_mutex_lock(&connectivity_lock);
938   memset(&connectivity_dequeue_thread_id, 0,
939          sizeof(connectivity_dequeue_thread_id));
940   connectivity_dequeue_thread_error = 0;
941   pthread_mutex_unlock(&connectivity_lock);
942
943   DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
944
945   return (status);
946 } /* }}} int stop_dequeue_thread */
947
948 static int stop_threads(int shutdown) /* {{{ */
949 {
950   int status, status2;
951
952   status = stop_netlink_thread(shutdown);
953   status2 = stop_dequeue_thread(shutdown);
954
955   if (status < 0)
956     return status;
957   else
958     return status2;
959 } /* }}} int stop_threads */
960
961 static int connectivity_init(void) /* {{{ */
962 {
963   if (monitor_all_interfaces) {
964     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
965            "be monitored");
966   }
967
968   return (start_threads());
969 } /* }}} int connectivity_init */
970
971 static int connectivity_config(const char *key, const char *value) /* {{{ */
972 {
973   if (ignorelist == NULL) {
974     ignorelist = ignorelist_create(/* invert = */ 1);
975   }
976
977   if (strcasecmp(key, "Interface") == 0) {
978     ignorelist_add(ignorelist, value);
979     monitor_all_interfaces = 0;
980   } else if (strcasecmp(key, "IgnoreSelected") == 0) {
981     int invert = 1;
982     if (IS_TRUE(value))
983       invert = 0;
984     ignorelist_set_invert(ignorelist, invert);
985   } else {
986     return (-1);
987   }
988
989   return (0);
990 } /* }}} int connectivity_config */
991
992 static void
993 connectivity_dispatch_notification(const char *interface, const char *type,
994                                    gauge_t value, gauge_t old_value,
995                                    long long unsigned int timestamp) {
996   char *buf = NULL;
997   notification_t n = {
998       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
999
1000   if (value == LINK_STATE_UP)
1001     n.severity = NOTIF_OKAY;
1002
1003   sstrncpy(n.host, hostname_g, sizeof(n.host));
1004   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
1005   sstrncpy(n.type, "gauge", sizeof(n.type));
1006   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
1007
1008   gen_message_payload(value, old_value, interface, timestamp, &buf);
1009
1010   notification_meta_t *m = calloc(1, sizeof(*m));
1011
1012   if (m == NULL) {
1013     char errbuf[1024];
1014     sfree(buf);
1015     ERROR("connectivity plugin: unable to allocate metadata: %s",
1016           sstrerror(errno, errbuf, sizeof(errbuf)));
1017     return;
1018   }
1019
1020   sstrncpy(m->name, "ves", sizeof(m->name));
1021   m->nm_value.nm_string = sstrdup(buf);
1022   m->type = NM_TYPE_STRING;
1023   n.meta = m;
1024
1025   DEBUG("connectivity plugin: notification message: %s",
1026         n.meta->nm_value.nm_string);
1027
1028   DEBUG("connectivity plugin: dispatching state %d for interface %s",
1029         (int)value, interface);
1030
1031   plugin_dispatch_notification(&n);
1032   plugin_notification_meta_free(n.meta);
1033
1034   // malloc'd in gen_message_payload
1035   if (buf != NULL)
1036     sfree(buf);
1037 }
1038
1039 static int connectivity_read(void) /* {{{ */
1040 {
1041   pthread_mutex_lock(&connectivity_lock);
1042
1043   if (connectivity_netlink_thread_error != 0) {
1044
1045     pthread_mutex_unlock(&connectivity_lock);
1046
1047     ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
1048           "it.");
1049
1050     stop_netlink_thread(0);
1051
1052     for (interface_list_t *il = interface_list_head; il != NULL;
1053          il = il->next) {
1054       il->status = LINK_STATE_UNKNOWN;
1055       il->prev_status = LINK_STATE_UNKNOWN;
1056       il->sent = 0;
1057     }
1058
1059     start_netlink_thread();
1060
1061     return (-1);
1062   } /* if (connectivity_netlink_thread_error != 0) */
1063
1064   if (connectivity_dequeue_thread_error != 0) {
1065
1066     pthread_mutex_unlock(&connectivity_lock);
1067
1068     ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
1069           "it.");
1070
1071     stop_dequeue_thread(0);
1072
1073     start_dequeue_thread();
1074
1075     return (-1);
1076   } /* if (connectivity_dequeue_thread_error != 0) */
1077
1078   pthread_mutex_unlock(&connectivity_lock);
1079
1080   return (0);
1081 } /* }}} int connectivity_read */
1082
1083 static int connectivity_shutdown(void) /* {{{ */
1084 {
1085   interface_list_t *il;
1086
1087   DEBUG("connectivity plugin: Shutting down thread.");
1088   if (stop_threads(1) < 0)
1089     return (-1);
1090
1091   il = interface_list_head;
1092   while (il != NULL) {
1093     interface_list_t *il_next;
1094
1095     il_next = il->next;
1096
1097     sfree(il->interface);
1098     sfree(il);
1099
1100     il = il_next;
1101   }
1102
1103   ignorelist_free(ignorelist);
1104
1105   return (0);
1106 } /* }}} int connectivity_shutdown */
1107
1108 void module_register(void) {
1109   plugin_register_config("connectivity", connectivity_config, config_keys,
1110                          config_keys_num);
1111   plugin_register_init("connectivity", connectivity_init);
1112   plugin_register_read("connectivity", connectivity_read);
1113   plugin_register_shutdown("connectivity", connectivity_shutdown);
1114 } /* void module_register */