connectivity notifications
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <net/if.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <libmnl/libmnl.h>
45 #include <linux/netlink.h>
46 #include <linux/rtnetlink.h>
47
48 #include <yajl/yajl_common.h>
49 #include <yajl/yajl_gen.h>
50 #if HAVE_YAJL_YAJL_VERSION_H
51 #include <yajl/yajl_version.h>
52 #endif
53 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
54 #define HAVE_YAJL_V2 1
55 #endif
56
57 #define MYPROTO NETLINK_ROUTE
58
59 #define CONNECTIVITY_DOMAIN_FIELD "domain"
60 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
61 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
62 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
63 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
64 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
65 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
66 #define CONNECTIVITY_PRIORITY_FIELD "priority"
67 #define CONNECTIVITY_PRIORITY_VALUE "high"
68 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
69 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
70 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
71 #define CONNECTIVITY_SEQUENCE_VALUE "0"
72 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
73 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
74 #define CONNECTIVITY_VERSION_FIELD "version"
75 #define CONNECTIVITY_VERSION_VALUE "1.0"
76
77 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
78 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
79 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
80 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
81 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
82 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
83 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
84 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
85   "stateChangeFieldsVersion"
86 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
87 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
88
89 /*
90  * Private data types
91  */
92 struct interfacelist_s {
93   char *interface;
94
95   uint32_t status;
96   uint32_t prev_status;
97   uint32_t sent;
98   long long unsigned int timestamp;
99
100   struct interfacelist_s *next;
101 };
102 typedef struct interfacelist_s interfacelist_t;
103
104 /*
105  * Private variables
106  */
107 static interfacelist_t *interfacelist_head = NULL;
108
109 static int connectivity_thread_loop = 0;
110 static int connectivity_thread_error = 0;
111 static pthread_t connectivity_thread_id;
112 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
114 static struct mnl_socket *sock;
115 static int event_id = 0;
116
117 static const char *config_keys[] = {"Interface"};
118 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
119
120 /*
121  * Private functions
122  */
123
124 static int gen_message_payload(int state, int old_state, const char *interface,
125                                long long unsigned int timestamp, char **buf) {
126   const unsigned char *buf2;
127   yajl_gen g;
128
129 #if !defined(HAVE_YAJL_V2)
130   yajl_gen_config conf = {};
131
132   conf.beautify = 0;
133 #endif
134
135 #if HAVE_YAJL_V2
136   size_t len;
137   g = yajl_gen_alloc(NULL);
138   yajl_gen_config(g, yajl_gen_beautify, 0);
139 #else
140   unsigned int len;
141   g = yajl_gen_alloc(&conf, NULL);
142 #endif
143
144   yajl_gen_clear(g);
145
146   // *** BEGIN common event header ***
147
148   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
149     goto err;
150
151   // domain
152   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
153                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
154     goto err;
155
156   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
157                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
158     goto err;
159
160   // eventId
161   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
162                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
163       yajl_gen_status_ok)
164     goto err;
165
166   event_id = event_id + 1;
167   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
168   char *event_id_str = malloc(event_id_len);
169   snprintf(event_id_str, event_id_len, "%d", event_id);
170
171   if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
172       yajl_gen_status_ok) {
173     sfree(event_id_str);
174     goto err;
175   }
176
177   sfree(event_id_str);
178
179   // eventName
180   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
181                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
182       yajl_gen_status_ok)
183     goto err;
184
185   int event_name_len = 0;
186   event_name_len = event_name_len + strlen(interface);    // interface name
187   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
188   event_name_len =
189       event_name_len + 12; // "interface", 2 spaces and null-terminator
190   char *event_name_str = malloc(event_name_len);
191   memset(event_name_str, '\0', event_name_len);
192   snprintf(event_name_str, event_name_len, "interface %s %s", interface,
193            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
194                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
195
196   if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
197       yajl_gen_status_ok) {
198     sfree(event_name_str);
199     goto err;
200   }
201
202   sfree(event_name_str);
203
204   // lastEpochMicrosec
205   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
206                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
207       yajl_gen_status_ok)
208     goto err;
209
210   int last_epoch_microsec_len =
211       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
212   char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
213   snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
214            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
215
216   if (yajl_gen_number(g, last_epoch_microsec_str,
217                       strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
218     sfree(last_epoch_microsec_str);
219     goto err;
220   }
221
222   sfree(last_epoch_microsec_str);
223
224   // priority
225   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
226                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
227       yajl_gen_status_ok)
228     goto err;
229
230   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
231                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
232       yajl_gen_status_ok)
233     goto err;
234
235   // reportingEntityName
236   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
237                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
238       yajl_gen_status_ok)
239     goto err;
240
241   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
242                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
243       yajl_gen_status_ok)
244     goto err;
245
246   // sequence
247   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
248                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
249       yajl_gen_status_ok)
250     goto err;
251
252   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
253                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
254       yajl_gen_status_ok)
255     goto err;
256
257   // sourceName
258   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
259                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
260       yajl_gen_status_ok)
261     goto err;
262
263   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
264       yajl_gen_status_ok)
265     goto err;
266
267   // startEpochMicrosec
268   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
269                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
270       yajl_gen_status_ok)
271     goto err;
272
273   int start_epoch_microsec_len =
274       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
275   char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
276   snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
277            (long long unsigned int)timestamp);
278
279   if (yajl_gen_number(g, start_epoch_microsec_str,
280                       strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
281     sfree(start_epoch_microsec_str);
282     goto err;
283   }
284
285   sfree(start_epoch_microsec_str);
286
287   // version
288   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
289                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
290     goto err;
291
292   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
293                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
294     goto err;
295
296   // *** END common event header ***
297
298   // *** BEGIN state change fields ***
299
300   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
301                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
302       yajl_gen_status_ok)
303     goto err;
304
305   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
306     goto err;
307
308   // newState
309   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
310                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
311       yajl_gen_status_ok)
312     goto err;
313
314   int new_state_len =
315       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
316                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
317
318   if (yajl_gen_string(
319           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
320                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
321           new_state_len) != yajl_gen_status_ok)
322     goto err;
323
324   // oldState
325   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
326                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
327       yajl_gen_status_ok)
328     goto err;
329
330   int old_state_len =
331       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
332                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
333
334   if (yajl_gen_string(
335           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
336                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
337           old_state_len) != yajl_gen_status_ok)
338     goto err;
339
340   // stateChangeFieldsVersion
341   if (yajl_gen_string(g,
342                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
343                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
344       yajl_gen_status_ok)
345     goto err;
346
347   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
348                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
349       yajl_gen_status_ok)
350     goto err;
351
352   // stateInterface
353   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
354                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
355       yajl_gen_status_ok)
356     goto err;
357
358   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
359       yajl_gen_status_ok)
360     goto err;
361
362   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
363     goto err;
364
365   // *** END state change fields ***
366
367   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
368     goto err;
369
370   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
371     goto err;
372
373   *buf = malloc(strlen((char *)buf2) + 1);
374
375   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
376
377   yajl_gen_free(g);
378
379   return 0;
380
381 err:
382   yajl_gen_free(g);
383   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
384   return -1;
385 }
386
387 static int connectivity_link_state(struct nlmsghdr *msg) {
388   int retval = 0;
389   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
390   struct nlattr *attr;
391   const char *dev = NULL;
392
393   pthread_mutex_lock(&connectivity_lock);
394
395   interfacelist_t *il;
396
397   /* Scan attribute list for device name. */
398   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
399     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
400       continue;
401
402     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
403       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
404             "mnl_attr_validate "
405             "failed.");
406       pthread_mutex_unlock(&connectivity_lock);
407       return MNL_CB_ERROR;
408     }
409
410     dev = mnl_attr_get_str(attr);
411
412     for (il = interfacelist_head; il != NULL; il = il->next)
413       if (strcmp(dev, il->interface) == 0)
414         break;
415
416     if (il == NULL) {
417       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
418             "interface: %s",
419             dev);
420     } else {
421       uint32_t prev_status;
422
423       prev_status = il->status;
424       il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0);
425       il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
426
427       // If the new status is different than the previous status,
428       // store the previous status and set sent to zero
429       if (il->status != prev_status) {
430         il->prev_status = prev_status;
431         il->sent = 0;
432       }
433
434       DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
435             il->timestamp, dev,
436             ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
437     }
438
439     // no need to loop again, we found the interface name attr
440     // (otherwise the first if-statement in the loop would
441     // have moved us on with 'continue')
442     break;
443   }
444
445   pthread_mutex_unlock(&connectivity_lock);
446
447   return retval;
448 }
449
450 static int msg_handler(struct nlmsghdr *msg) {
451   switch (msg->nlmsg_type) {
452   case RTM_NEWADDR:
453     break;
454   case RTM_DELADDR:
455     break;
456   case RTM_NEWROUTE:
457     break;
458   case RTM_DELROUTE:
459     break;
460   case RTM_NEWLINK:
461     connectivity_link_state(msg);
462     break;
463   case RTM_DELLINK:
464     break;
465   default:
466     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
467           msg->nlmsg_type);
468     break;
469   }
470   return 0;
471 }
472
473 static int read_event(struct mnl_socket *nl,
474                       int (*msg_handler)(struct nlmsghdr *)) {
475   int status;
476   int ret = 0;
477   char buf[4096];
478   struct nlmsghdr *h;
479
480   if (nl == NULL)
481     return ret;
482
483   status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
484
485   if (status < 0) {
486     /* Socket non-blocking so bail out once we have read everything */
487     if (errno == EWOULDBLOCK || errno == EAGAIN)
488       return ret;
489
490     /* Anything else is an error */
491     ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
492           status);
493     return status;
494   }
495
496   if (status == 0) {
497     DEBUG("connectivity plugin: read_event: EOF\n");
498   }
499
500   /* We need to handle more than one message per 'recvmsg' */
501   for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
502        h = NLMSG_NEXT(h, status)) {
503     /* Finish reading */
504     if (h->nlmsg_type == NLMSG_DONE)
505       return ret;
506
507     /* Message is some kind of error */
508     if (h->nlmsg_type == NLMSG_ERROR) {
509       ERROR("connectivity plugin: read_event: Message is an error\n");
510       return -1; // Error
511     }
512
513     /* Call message handler */
514     if (msg_handler) {
515       ret = (*msg_handler)(h);
516       if (ret < 0) {
517         ERROR("connectivity plugin: read_event: Message handler error %d\n",
518               ret);
519         return ret;
520       }
521     } else {
522       ERROR("connectivity plugin: read_event: Error NULL message handler\n");
523       return -1;
524     }
525   }
526
527   return ret;
528 }
529
530 static void *connectivity_thread(void *arg) /* {{{ */
531 {
532   pthread_mutex_lock(&connectivity_lock);
533
534   while (connectivity_thread_loop > 0) {
535     int status;
536
537     pthread_mutex_unlock(&connectivity_lock);
538
539     status = read_event(sock, msg_handler);
540
541     pthread_mutex_lock(&connectivity_lock);
542
543     if (status < 0) {
544       connectivity_thread_error = 1;
545       break;
546     }
547
548     if (connectivity_thread_loop <= 0)
549       break;
550   } /* while (connectivity_thread_loop > 0) */
551
552   pthread_mutex_unlock(&connectivity_lock);
553
554   return ((void *)0);
555 } /* }}} void *connectivity_thread */
556
557 static int start_thread(void) /* {{{ */
558 {
559   int status;
560
561   pthread_mutex_lock(&connectivity_lock);
562
563   if (connectivity_thread_loop != 0) {
564     pthread_mutex_unlock(&connectivity_lock);
565     return (0);
566   }
567
568   connectivity_thread_loop = 1;
569   connectivity_thread_error = 0;
570
571   if (sock == NULL) {
572     sock = mnl_socket_open(NETLINK_ROUTE);
573     if (sock == NULL) {
574       ERROR(
575           "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
576       pthread_mutex_unlock(&connectivity_lock);
577       return (-1);
578     }
579
580     if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
581       ERROR(
582           "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
583       pthread_mutex_unlock(&connectivity_lock);
584       return (1);
585     }
586   }
587
588   status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
589                                 connectivity_thread,
590                                 /* arg = */ (void *)0, "connectivity");
591   if (status != 0) {
592     connectivity_thread_loop = 0;
593     ERROR("connectivity plugin: Starting thread failed.");
594     pthread_mutex_unlock(&connectivity_lock);
595     mnl_socket_close(sock);
596     return (-1);
597   }
598
599   pthread_mutex_unlock(&connectivity_lock);
600   return (0);
601 } /* }}} int start_thread */
602
603 static int stop_thread(int shutdown) /* {{{ */
604 {
605   int status;
606
607   if (sock != NULL)
608     mnl_socket_close(sock);
609
610   pthread_mutex_lock(&connectivity_lock);
611
612   if (connectivity_thread_loop == 0) {
613     pthread_mutex_unlock(&connectivity_lock);
614     return (-1);
615   }
616
617   connectivity_thread_loop = 0;
618   pthread_cond_broadcast(&connectivity_cond);
619   pthread_mutex_unlock(&connectivity_lock);
620
621   if (shutdown == 1) {
622     // Since the thread is blocking, calling pthread_join
623     // doesn't actually succeed in stopping it.  It will stick around
624     // until a NETLINK message is received on the socket (at which
625     // it will realize that "connectivity_thread_loop" is 0 and will
626     // break out of the read loop and be allowed to die).  This is
627     // fine when the process isn't supposed to be exiting, but in
628     // the case of a process shutdown, we don't want to have an
629     // idle thread hanging around.  Calling pthread_cancel here in
630     // the case of a shutdown is just assures that the thread is
631     // gone and that the process has been fully terminated.
632
633     DEBUG("connectivity plugin: Canceling thread for process shutdown");
634
635     status = pthread_cancel(connectivity_thread_id);
636
637     if (status != 0) {
638       ERROR("connectivity plugin: Unable to cancel thread: %d", status);
639       status = -1;
640     }
641   } else {
642     status = pthread_join(connectivity_thread_id, /* return = */ NULL);
643     if (status != 0) {
644       ERROR("connectivity plugin: Stopping thread failed.");
645       status = -1;
646     }
647   }
648
649   pthread_mutex_lock(&connectivity_lock);
650   memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
651   connectivity_thread_error = 0;
652   pthread_mutex_unlock(&connectivity_lock);
653
654   DEBUG("connectivity plugin: Finished requesting stop of thread");
655
656   return (status);
657 } /* }}} int stop_thread */
658
659 static int connectivity_init(void) /* {{{ */
660 {
661   if (interfacelist_head == NULL) {
662     NOTICE("connectivity plugin: No interfaces have been configured.");
663     return (-1);
664   }
665
666   return (start_thread());
667 } /* }}} int connectivity_init */
668
669 static int connectivity_config(const char *key, const char *value) /* {{{ */
670 {
671   if (strcasecmp(key, "Interface") == 0) {
672     interfacelist_t *il;
673     char *interface;
674
675     il = malloc(sizeof(*il));
676     if (il == NULL) {
677       char errbuf[1024];
678       ERROR("connectivity plugin: malloc failed during connectivity_config: %s",
679             sstrerror(errno, errbuf, sizeof(errbuf)));
680       return (1);
681     }
682
683     interface = strdup(value);
684     if (interface == NULL) {
685       char errbuf[1024];
686       sfree(il);
687       ERROR("connectivity plugin: strdup failed connectivity_config: %s",
688             sstrerror(errno, errbuf, sizeof(errbuf)));
689       return (1);
690     }
691
692     il->interface = interface;
693     il->status = 2; // "unknown"
694     il->prev_status = 2;
695     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
696     il->sent = 0;
697     il->next = interfacelist_head;
698     interfacelist_head = il;
699
700   } else {
701     return (-1);
702   }
703
704   return (0);
705 } /* }}} int connectivity_config */
706
707 static void connectivity_dispatch_notification(
708     const char *interface, const char *type, /* {{{ */
709     gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
710   char *buf = NULL;
711   notification_t n = {
712       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
713
714   if (value == 1)
715     n.severity = NOTIF_OKAY;
716
717   char hostname[1024];
718   gethostname(hostname, sizeof(hostname));
719
720   sstrncpy(n.host, hostname, sizeof(n.host));
721   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
722   sstrncpy(n.type, "gauge", sizeof(n.type));
723   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
724
725   gen_message_payload(value, old_value, interface, timestamp, &buf);
726
727   notification_meta_t *m = calloc(1, sizeof(*m));
728
729   if (m == NULL) {
730     char errbuf[1024];
731     sfree(buf);
732     ERROR("connectivity plugin: unable to allocate metadata: %s",
733           sstrerror(errno, errbuf, sizeof(errbuf)));
734     return;
735   }
736
737   sstrncpy(m->name, "ves", sizeof(m->name));
738   m->nm_value.nm_string = sstrdup(buf);
739   m->type = NM_TYPE_STRING;
740   n.meta = m;
741
742   DEBUG("connectivity plugin: notification message: %s",
743         n.meta->nm_value.nm_string);
744
745   DEBUG("connectivity plugin: dispatching state %d for interface %s",
746         (int)value, interface);
747
748   plugin_dispatch_notification(&n);
749   plugin_notification_meta_free(n.meta);
750
751   // malloc'd in gen_message_payload
752   if (buf != NULL)
753     sfree(buf);
754 }
755
756 static int connectivity_read(void) /* {{{ */
757 {
758   if (connectivity_thread_error != 0) {
759     ERROR("connectivity plugin: The interface thread had a problem. Restarting "
760           "it.");
761
762     stop_thread(0);
763
764     for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) {
765       il->status = 2; // signifies "unknown"
766       il->prev_status = 2;
767       il->sent = 0;
768     }
769
770     start_thread();
771
772     return (-1);
773   } /* if (connectivity_thread_error != 0) */
774
775   for (interfacelist_t *il = interfacelist_head; il != NULL;
776        il = il->next) /* {{{ */
777   {
778     uint32_t status;
779     uint32_t prev_status;
780     uint32_t sent;
781
782     pthread_mutex_lock(&connectivity_lock);
783
784     status = il->status;
785     prev_status = il->prev_status;
786     sent = il->sent;
787
788     if (status != prev_status && sent == 0) {
789       connectivity_dispatch_notification(il->interface, "gauge", status,
790                                          prev_status, il->timestamp);
791       il->sent = 1;
792     }
793
794     pthread_mutex_unlock(&connectivity_lock);
795   } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */
796
797   return (0);
798 } /* }}} int connectivity_read */
799
800 static int connectivity_shutdown(void) /* {{{ */
801 {
802   interfacelist_t *il;
803
804   DEBUG("connectivity plugin: Shutting down thread.");
805   if (stop_thread(1) < 0)
806     return (-1);
807
808   il = interfacelist_head;
809   while (il != NULL) {
810     interfacelist_t *il_next;
811
812     il_next = il->next;
813
814     sfree(il->interface);
815     sfree(il);
816
817     il = il_next;
818   }
819
820   return (0);
821 } /* }}} int connectivity_shutdown */
822
823 void module_register(void) {
824   plugin_register_config("connectivity", connectivity_config, config_keys,
825                          config_keys_num);
826   plugin_register_init("connectivity", connectivity_init);
827   plugin_register_read("connectivity", connectivity_read);
828   plugin_register_shutdown("connectivity", connectivity_shutdown);
829 } /* void module_register */