2 * collectd - src/connectivity.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
25 * Aneesh Puttur <aputtur at redhat.com>
32 #include "utils_complain.h"
33 #include "utils_ignorelist.h"
35 #include <asm/types.h>
38 #include <netinet/in.h>
42 #include <sys/socket.h>
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
58 #define MYPROTO NETLINK_ROUTE
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD \
90 "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
98 struct interface_list_s {
102 uint32_t prev_status;
104 long long unsigned int timestamp;
106 struct interface_list_s *next;
108 typedef struct interface_list_s interface_list_t;
114 static ignorelist_t *ignorelist = NULL;
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static int connectivity_dequeue_thread_error = 0;
124 static pthread_t connectivity_dequeue_thread_id;
125 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
127 // static struct mnl_socket *sock;
128 static int nl_sock = -1;
129 static int event_id = 0;
131 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
132 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
139 connectivity_dispatch_notification(const char *interface, const char *type,
140 gauge_t value, gauge_t old_value,
141 long long unsigned int timestamp);
147 static int gen_message_payload(int state, int old_state, const char *interface,
148 long long unsigned int timestamp, char **buf) {
149 const unsigned char *buf2;
151 char json_str[DATA_MAX_NAME_LEN];
153 #if !defined(HAVE_YAJL_V2)
154 yajl_gen_config conf = {};
161 g = yajl_gen_alloc(NULL);
162 yajl_gen_config(g, yajl_gen_beautify, 0);
165 g = yajl_gen_alloc(&conf, NULL);
170 // *** BEGIN common event header ***
172 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
176 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
177 strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
180 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
181 strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
185 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
186 strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
190 event_id = event_id + 1;
191 int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
192 memset(json_str, '\0', DATA_MAX_NAME_LEN);
193 snprintf(json_str, event_id_len, "%d", event_id);
195 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
200 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
201 strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
205 int event_name_len = 0;
206 event_name_len = event_name_len + strlen(interface); // interface name
207 event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
209 event_name_len + 12; // "interface", 2 spaces and null-terminator
210 memset(json_str, '\0', DATA_MAX_NAME_LEN);
211 snprintf(json_str, event_name_len, "interface %s %s", interface,
212 (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
213 : CONNECTIVITY_EVENT_NAME_UP_VALUE));
215 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
216 yajl_gen_status_ok) {
221 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
222 strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
226 int last_epoch_microsec_len =
227 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
228 memset(json_str, '\0', DATA_MAX_NAME_LEN);
229 snprintf(json_str, last_epoch_microsec_len, "%llu",
230 (long long unsigned int)CDTIME_T_TO_US(cdtime()));
232 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
237 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
238 strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
242 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
243 strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
247 // reportingEntityName
248 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
249 strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
253 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
254 strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
259 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
260 strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
264 if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
265 strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
270 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
271 strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
275 if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
279 // startEpochMicrosec
280 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
281 strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
285 int start_epoch_microsec_len =
286 sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
287 memset(json_str, '\0', DATA_MAX_NAME_LEN);
288 snprintf(json_str, start_epoch_microsec_len, "%llu",
289 (long long unsigned int)timestamp);
291 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
296 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
297 strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
300 if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
301 strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
304 // *** END common event header ***
306 // *** BEGIN state change fields ***
308 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
309 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
313 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
317 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
318 strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
323 (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
324 : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
327 g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
328 : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
329 new_state_len) != yajl_gen_status_ok)
333 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
334 strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
339 (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
340 : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
343 g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
344 : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
345 old_state_len) != yajl_gen_status_ok)
348 // stateChangeFieldsVersion
349 if (yajl_gen_string(g,
350 (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
351 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
355 if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
356 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
361 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
362 strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
366 if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
370 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
373 // *** END state change fields ***
375 if (yajl_gen_map_close(g) != yajl_gen_status_ok)
378 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
381 *buf = malloc(strlen((char *)buf2) + 1);
385 ERROR("connectivity plugin: malloc failed during gen_message_payload: %s",
386 sstrerror(errno, errbuf, sizeof(errbuf)));
390 sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
398 ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
402 static interface_list_t *add_interface(const char *interface, int status,
404 interface_list_t *il;
407 il = malloc(sizeof(*il));
410 ERROR("connectivity plugin: malloc failed during add_interface: %s",
411 sstrerror(errno, errbuf, sizeof(errbuf)));
415 interface2 = strdup(interface);
416 if (interface2 == NULL) {
419 ERROR("connectivity plugin: strdup failed during add_interface: %s",
420 sstrerror(errno, errbuf, sizeof(errbuf)));
424 il->interface = interface2;
426 il->prev_status = prev_status;
427 il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
429 il->next = interface_list_head;
430 interface_list_head = il;
432 DEBUG("connectivity plugin: added interface %s", interface2);
437 static int connectivity_link_state(struct nlmsghdr *msg) {
439 struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
441 const char *dev = NULL;
443 pthread_mutex_lock(&connectivity_lock);
445 interface_list_t *il = NULL;
447 /* Scan attribute list for device name. */
448 mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
449 if (mnl_attr_get_type(attr) != IFLA_IFNAME)
452 if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
453 ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
456 pthread_mutex_unlock(&connectivity_lock);
460 dev = mnl_attr_get_str(attr);
462 // Check the list of interfaces we should monitor, if we've chosen
463 // a subset. If we don't care about this one, abort.
464 if (ignorelist_match(ignorelist, dev) != 0) {
465 DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
471 for (il = interface_list_head; il != NULL; il = il->next)
472 if (strcmp(dev, il->interface) == 0)
475 uint32_t prev_status;
478 // We haven't encountered this interface yet, so add it to the linked list
479 il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
482 ERROR("connectivity plugin: unable to add interface %s during "
483 "connectivity_link_state",
489 prev_status = il->status;
491 ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
492 il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
494 // If the new status is different than the previous status,
495 // store the previous status and set sent to zero
496 if (il->status != prev_status) {
497 il->prev_status = prev_status;
501 DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
502 il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
504 // no need to loop again, we found the interface name attr
505 // (otherwise the first if-statement in the loop would
506 // have moved us on with 'continue')
510 pthread_mutex_unlock(&connectivity_lock);
515 static int msg_handler(struct nlmsghdr *msg) {
516 switch (msg->nlmsg_type) {
522 // Not of interest in current version
525 connectivity_link_state(msg);
528 ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
535 // static int read_event(struct mnl_socket *nl,
536 // int (*msg_handler)(struct nlmsghdr *)) {
537 static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
542 int recv_flags = MSG_DONTWAIT;
551 pthread_mutex_lock(&connectivity_lock);
553 if (connectivity_netlink_thread_loop <= 0) {
554 pthread_mutex_unlock(&connectivity_lock);
558 pthread_mutex_unlock(&connectivity_lock);
560 status = recv(nl, buf, sizeof(buf), recv_flags);
564 // If there were no more messages to drain from the socket,
565 // then signal the dequeue thread and allow it to dispatch
566 // any saved interface status changes. Then continue, but
567 // block and wait for new messages
568 if (errno == EWOULDBLOCK || errno == EAGAIN) {
569 pthread_mutex_lock(&connectivity_lock);
570 pthread_cond_signal(&connectivity_cond);
571 pthread_mutex_unlock(&connectivity_lock);
577 /* Anything else is an error */
578 // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom:
581 ERROR("connectivity plugin: read_event: Error recv: %d\n", status);
585 // Message received successfully, so we'll stop blocking on the
586 // receive call for now (until we get a "would block" error, which
587 // will be handled above)
588 recv_flags = MSG_DONTWAIT;
591 DEBUG("connectivity plugin: read_event: EOF\n");
594 /* We need to handle more than one message per 'recvmsg' */
595 for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
596 h = NLMSG_NEXT(h, status)) {
598 if (h->nlmsg_type == NLMSG_DONE)
601 /* Message is some kind of error */
602 if (h->nlmsg_type == NLMSG_ERROR) {
603 ERROR("connectivity plugin: read_event: Message is an error\n");
607 /* Call message handler */
609 ret = (*msg_handler)(h);
611 ERROR("connectivity plugin: read_event: Message handler error %d\n",
616 ERROR("connectivity plugin: read_event: Error NULL message handler\n");
625 static void send_interface_status() {
626 for (interface_list_t *il = interface_list_head; il != NULL;
627 il = il->next) /* {{{ */
630 uint32_t prev_status;
634 prev_status = il->prev_status;
637 if (status != prev_status && sent == 0) {
638 connectivity_dispatch_notification(il->interface, "gauge", status,
639 prev_status, il->timestamp);
642 } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
645 static int read_interface_status() /* {{{ */
647 pthread_mutex_lock(&connectivity_lock);
649 // This first attempt is necessary because the netlink thread
650 // might have held the lock while this thread was blocked on
651 // the lock acquisition just above. And while the netlink thread
652 // had the lock, it could have called pthread_cond_singal, which
653 // obviously wouldn't have woken this thread, since this thread
654 // was not yet waiting on the condition signal. So we need to
655 // loop through the interfaces and check if any have changed
656 // status before we wait on the condition signal
657 send_interface_status();
659 pthread_cond_wait(&connectivity_cond, &connectivity_lock);
661 send_interface_status();
663 pthread_mutex_unlock(&connectivity_lock);
666 } /* }}} int *read_interface_status */
668 static void *connectivity_netlink_thread(void *arg) /* {{{ */
670 pthread_mutex_lock(&connectivity_lock);
672 while (connectivity_netlink_thread_loop > 0) {
675 pthread_mutex_unlock(&connectivity_lock);
677 status = read_event(nl_sock, msg_handler);
679 pthread_mutex_lock(&connectivity_lock);
682 connectivity_netlink_thread_error = 1;
686 if (connectivity_netlink_thread_loop <= 0)
688 } /* while (connectivity_netlink_thread_loop > 0) */
690 pthread_mutex_unlock(&connectivity_lock);
693 } /* }}} void *connectivity_netlink_thread */
695 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
697 pthread_mutex_lock(&connectivity_lock);
699 while (connectivity_dequeue_thread_loop > 0) {
702 pthread_mutex_unlock(&connectivity_lock);
704 status = read_interface_status();
706 pthread_mutex_lock(&connectivity_lock);
709 connectivity_dequeue_thread_error = 1;
713 if (connectivity_dequeue_thread_loop <= 0)
715 } /* while (connectivity_dequeue_thread_loop > 0) */
717 pthread_mutex_unlock(&connectivity_lock);
720 } /* }}} void *connectivity_dequeue_thread */
722 static int nl_connect() {
724 struct sockaddr_nl sa_nl;
726 nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
728 ERROR("connectivity plugin: socket open failed: %d", errno);
732 sa_nl.nl_family = AF_NETLINK;
733 sa_nl.nl_groups = RTMGRP_LINK;
734 sa_nl.nl_pid = getpid();
736 rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
738 ERROR("connectivity plugin: socket bind failed: %d", errno);
746 static int start_netlink_thread(void) /* {{{ */
750 pthread_mutex_lock(&connectivity_lock);
752 if (connectivity_netlink_thread_loop != 0) {
753 pthread_mutex_unlock(&connectivity_lock);
757 connectivity_netlink_thread_loop = 1;
758 connectivity_netlink_thread_error = 0;
761 status = nl_connect();
767 status = plugin_thread_create(&connectivity_netlink_thread_id,
768 /* attr = */ NULL, connectivity_netlink_thread,
769 /* arg = */ (void *)0, "connectivity");
771 connectivity_netlink_thread_loop = 0;
772 ERROR("connectivity plugin: Starting thread failed.");
773 pthread_mutex_unlock(&connectivity_lock);
775 int status2 = close(nl_sock);
778 ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
779 status2, strerror(errno));
786 pthread_mutex_unlock(&connectivity_lock);
791 static int start_dequeue_thread(void) /* {{{ */
795 pthread_mutex_lock(&connectivity_lock);
797 if (connectivity_dequeue_thread_loop != 0) {
798 pthread_mutex_unlock(&connectivity_lock);
802 connectivity_dequeue_thread_loop = 1;
803 connectivity_dequeue_thread_error = 0;
805 status = plugin_thread_create(&connectivity_dequeue_thread_id,
806 /* attr = */ NULL, connectivity_dequeue_thread,
807 /* arg = */ (void *)0, "connectivity");
809 connectivity_dequeue_thread_loop = 0;
810 ERROR("connectivity plugin: Starting dequeue thread failed.");
811 pthread_mutex_unlock(&connectivity_lock);
815 pthread_mutex_unlock(&connectivity_lock);
818 } /* }}} int start_dequeue_thread */
820 static int start_threads(void) /* {{{ */
824 status = start_netlink_thread();
825 status2 = start_dequeue_thread();
831 } /* }}} int start_threads */
833 static int stop_netlink_thread(int shutdown) /* {{{ */
838 status = close(nl_sock);
840 ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
841 status, strerror(errno));
847 pthread_mutex_lock(&connectivity_lock);
849 if (connectivity_netlink_thread_loop == 0) {
850 pthread_mutex_unlock(&connectivity_lock);
854 connectivity_netlink_thread_loop = 0;
855 pthread_cond_broadcast(&connectivity_cond);
856 pthread_mutex_unlock(&connectivity_lock);
859 // Since the thread is blocking, calling pthread_join
860 // doesn't actually succeed in stopping it. It will stick around
861 // until a NETLINK message is received on the socket (at which
862 // it will realize that "connectivity_netlink_thread_loop" is 0 and will
863 // break out of the read loop and be allowed to die). This is
864 // fine when the process isn't supposed to be exiting, but in
865 // the case of a process shutdown, we don't want to have an
866 // idle thread hanging around. Calling pthread_cancel here in
867 // the case of a shutdown is just assures that the thread is
868 // gone and that the process has been fully terminated.
870 DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
872 status = pthread_cancel(connectivity_netlink_thread_id);
874 if (status != 0 && status != ESRCH) {
875 ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status);
880 status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
881 if (status != 0 && status != ESRCH) {
882 ERROR("connectivity plugin: Stopping netlink thread failed.");
888 pthread_mutex_lock(&connectivity_lock);
889 memset(&connectivity_netlink_thread_id, 0,
890 sizeof(connectivity_netlink_thread_id));
891 connectivity_netlink_thread_error = 0;
892 pthread_mutex_unlock(&connectivity_lock);
894 DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
899 static int stop_dequeue_thread(int shutdown) /* {{{ */
903 pthread_mutex_lock(&connectivity_lock);
905 if (connectivity_dequeue_thread_loop == 0) {
906 pthread_mutex_unlock(&connectivity_lock);
910 connectivity_dequeue_thread_loop = 0;
911 pthread_cond_broadcast(&connectivity_cond);
912 pthread_mutex_unlock(&connectivity_lock);
915 // Calling pthread_cancel here in
916 // the case of a shutdown just assures that the thread is
917 // gone and that the process has been fully terminated.
919 DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
921 status = pthread_cancel(connectivity_dequeue_thread_id);
923 if (status != 0 && status != ESRCH) {
924 ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
929 status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
930 if (status != 0 && status != ESRCH) {
931 ERROR("connectivity plugin: Stopping dequeue thread failed.");
937 pthread_mutex_lock(&connectivity_lock);
938 memset(&connectivity_dequeue_thread_id, 0,
939 sizeof(connectivity_dequeue_thread_id));
940 connectivity_dequeue_thread_error = 0;
941 pthread_mutex_unlock(&connectivity_lock);
943 DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
946 } /* }}} int stop_dequeue_thread */
948 static int stop_threads(int shutdown) /* {{{ */
952 status = stop_netlink_thread(shutdown);
953 status2 = stop_dequeue_thread(shutdown);
959 } /* }}} int stop_threads */
961 static int connectivity_init(void) /* {{{ */
963 if (monitor_all_interfaces) {
964 NOTICE("connectivity plugin: No interfaces have been selected, so all will "
968 return (start_threads());
969 } /* }}} int connectivity_init */
971 static int connectivity_config(const char *key, const char *value) /* {{{ */
973 if (ignorelist == NULL) {
974 ignorelist = ignorelist_create(/* invert = */ 1);
977 if (strcasecmp(key, "Interface") == 0) {
978 ignorelist_add(ignorelist, value);
979 monitor_all_interfaces = 0;
980 } else if (strcasecmp(key, "IgnoreSelected") == 0) {
984 ignorelist_set_invert(ignorelist, invert);
990 } /* }}} int connectivity_config */
993 connectivity_dispatch_notification(const char *interface, const char *type,
994 gauge_t value, gauge_t old_value,
995 long long unsigned int timestamp) {
998 NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
1000 if (value == LINK_STATE_UP)
1001 n.severity = NOTIF_OKAY;
1003 sstrncpy(n.host, hostname_g, sizeof(n.host));
1004 sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
1005 sstrncpy(n.type, "gauge", sizeof(n.type));
1006 sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
1008 gen_message_payload(value, old_value, interface, timestamp, &buf);
1010 notification_meta_t *m = calloc(1, sizeof(*m));
1015 ERROR("connectivity plugin: unable to allocate metadata: %s",
1016 sstrerror(errno, errbuf, sizeof(errbuf)));
1020 sstrncpy(m->name, "ves", sizeof(m->name));
1021 m->nm_value.nm_string = sstrdup(buf);
1022 m->type = NM_TYPE_STRING;
1025 DEBUG("connectivity plugin: notification message: %s",
1026 n.meta->nm_value.nm_string);
1028 DEBUG("connectivity plugin: dispatching state %d for interface %s",
1029 (int)value, interface);
1031 plugin_dispatch_notification(&n);
1032 plugin_notification_meta_free(n.meta);
1034 // malloc'd in gen_message_payload
1039 static int connectivity_read(void) /* {{{ */
1041 pthread_mutex_lock(&connectivity_lock);
1043 if (connectivity_netlink_thread_error != 0) {
1045 pthread_mutex_unlock(&connectivity_lock);
1047 ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
1050 stop_netlink_thread(0);
1052 for (interface_list_t *il = interface_list_head; il != NULL;
1054 il->status = LINK_STATE_UNKNOWN;
1055 il->prev_status = LINK_STATE_UNKNOWN;
1059 start_netlink_thread();
1062 } /* if (connectivity_netlink_thread_error != 0) */
1064 if (connectivity_dequeue_thread_error != 0) {
1066 pthread_mutex_unlock(&connectivity_lock);
1068 ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
1071 stop_dequeue_thread(0);
1073 start_dequeue_thread();
1076 } /* if (connectivity_dequeue_thread_error != 0) */
1078 pthread_mutex_unlock(&connectivity_lock);
1081 } /* }}} int connectivity_read */
1083 static int connectivity_shutdown(void) /* {{{ */
1085 interface_list_t *il;
1087 DEBUG("connectivity plugin: Shutting down thread.");
1088 if (stop_threads(1) < 0)
1091 il = interface_list_head;
1092 while (il != NULL) {
1093 interface_list_t *il_next;
1097 sfree(il->interface);
1103 ignorelist_free(ignorelist);
1106 } /* }}} int connectivity_shutdown */
1108 void module_register(void) {
1109 plugin_register_config("connectivity", connectivity_config, config_keys,
1111 plugin_register_init("connectivity", connectivity_init);
1112 plugin_register_read("connectivity", connectivity_read);
1113 plugin_register_shutdown("connectivity", connectivity_shutdown);
1114 } /* void module_register */