snake case for iface list + payload simplification
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <net/if.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <libmnl/libmnl.h>
45 #include <linux/netlink.h>
46 #include <linux/rtnetlink.h>
47
48 #include <yajl/yajl_common.h>
49 #include <yajl/yajl_gen.h>
50 #if HAVE_YAJL_YAJL_VERSION_H
51 #include <yajl/yajl_version.h>
52 #endif
53 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
54 #define HAVE_YAJL_V2 1
55 #endif
56
57 #define MYPROTO NETLINK_ROUTE
58
59 #define LINK_STATE_DOWN 0
60 #define LINK_STATE_UP 1
61 #define LINK_STATE_UNKNOWN 2
62
63 #define CONNECTIVITY_DOMAIN_FIELD "domain"
64 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
65 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
66 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
67 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
68 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
69 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
70 #define CONNECTIVITY_PRIORITY_FIELD "priority"
71 #define CONNECTIVITY_PRIORITY_VALUE "high"
72 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
74 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
75 #define CONNECTIVITY_SEQUENCE_VALUE "0"
76 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
77 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
78 #define CONNECTIVITY_VERSION_FIELD "version"
79 #define CONNECTIVITY_VERSION_VALUE "1.0"
80
81 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
82 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
83 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
84 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
85 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
86 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
87 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
89   "stateChangeFieldsVersion"
90 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
91 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
92
93 /*
94  * Private data types
95  */
96 struct interface_list_s {
97   char *interface;
98
99   uint32_t status;
100   uint32_t prev_status;
101   uint32_t sent;
102   long long unsigned int timestamp;
103
104   struct interface_list_s *next;
105 };
106 typedef struct interface_list_s interface_list_t;
107
108 /*
109  * Private variables
110  */
111 static interface_list_t *interface_list_head = NULL;
112
113 static int connectivity_thread_loop = 0;
114 static int connectivity_thread_error = 0;
115 static pthread_t connectivity_thread_id;
116 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
117 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
118 static struct mnl_socket *sock;
119 static int event_id = 0;
120
121 static const char *config_keys[] = {"Interface"};
122 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
123
124 /*
125  * Private functions
126  */
127
128 static int gen_message_payload(int state, int old_state, const char *interface,
129                                long long unsigned int timestamp, char **buf) {
130   const unsigned char *buf2;
131   yajl_gen g;
132   char json_str[DATA_MAX_NAME_LEN];
133
134 #if !defined(HAVE_YAJL_V2)
135   yajl_gen_config conf = {};
136
137   conf.beautify = 0;
138 #endif
139
140 #if HAVE_YAJL_V2
141   size_t len;
142   g = yajl_gen_alloc(NULL);
143   yajl_gen_config(g, yajl_gen_beautify, 0);
144 #else
145   unsigned int len;
146   g = yajl_gen_alloc(&conf, NULL);
147 #endif
148
149   yajl_gen_clear(g);
150
151   // *** BEGIN common event header ***
152
153   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
154     goto err;
155
156   // domain
157   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
158                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
159     goto err;
160
161   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
162                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
163     goto err;
164
165   // eventId
166   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
167                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
168       yajl_gen_status_ok)
169     goto err;
170
171   event_id = event_id + 1;
172   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
173   memset(json_str, '\0', DATA_MAX_NAME_LEN);
174   snprintf(json_str, event_id_len, "%d", event_id);
175
176   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
177     goto err;
178   }
179
180   // eventName
181   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
182                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
183       yajl_gen_status_ok)
184     goto err;
185
186   int event_name_len = 0;
187   event_name_len = event_name_len + strlen(interface);    // interface name
188   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
189   event_name_len =
190       event_name_len + 12; // "interface", 2 spaces and null-terminator
191   memset(json_str, '\0', DATA_MAX_NAME_LEN);
192   snprintf(json_str, event_name_len, "interface %s %s", interface,
193            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
194                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
195
196   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
197       yajl_gen_status_ok) {
198     goto err;
199   }
200
201   // lastEpochMicrosec
202   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
203                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
204       yajl_gen_status_ok)
205     goto err;
206
207   int last_epoch_microsec_len =
208       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
209   memset(json_str, '\0', DATA_MAX_NAME_LEN);
210   snprintf(json_str, last_epoch_microsec_len, "%llu",
211            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
212
213   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
214     goto err;
215   }
216
217   // priority
218   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
219                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
220       yajl_gen_status_ok)
221     goto err;
222
223   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
224                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
225       yajl_gen_status_ok)
226     goto err;
227
228   // reportingEntityName
229   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
230                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
231       yajl_gen_status_ok)
232     goto err;
233
234   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
235                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
236       yajl_gen_status_ok)
237     goto err;
238
239   // sequence
240   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
241                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
242       yajl_gen_status_ok)
243     goto err;
244
245   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
246                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
247       yajl_gen_status_ok)
248     goto err;
249
250   // sourceName
251   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
252                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
253       yajl_gen_status_ok)
254     goto err;
255
256   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
257       yajl_gen_status_ok)
258     goto err;
259
260   // startEpochMicrosec
261   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
262                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
263       yajl_gen_status_ok)
264     goto err;
265
266   int start_epoch_microsec_len =
267       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
268   memset(json_str, '\0', DATA_MAX_NAME_LEN);
269   snprintf(json_str, start_epoch_microsec_len, "%llu",
270            (long long unsigned int)timestamp);
271
272   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
273     goto err;
274   }
275
276   // version
277   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
278                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
279     goto err;
280
281   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
282                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
283     goto err;
284
285   // *** END common event header ***
286
287   // *** BEGIN state change fields ***
288
289   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
290                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
291       yajl_gen_status_ok)
292     goto err;
293
294   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
295     goto err;
296
297   // newState
298   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
299                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
300       yajl_gen_status_ok)
301     goto err;
302
303   int new_state_len =
304       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
305                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
306
307   if (yajl_gen_string(
308           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
309                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
310           new_state_len) != yajl_gen_status_ok)
311     goto err;
312
313   // oldState
314   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
315                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
316       yajl_gen_status_ok)
317     goto err;
318
319   int old_state_len =
320       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
321                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
322
323   if (yajl_gen_string(
324           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
325                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
326           old_state_len) != yajl_gen_status_ok)
327     goto err;
328
329   // stateChangeFieldsVersion
330   if (yajl_gen_string(g,
331                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
332                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
333       yajl_gen_status_ok)
334     goto err;
335
336   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
337                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
338       yajl_gen_status_ok)
339     goto err;
340
341   // stateInterface
342   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
343                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
344       yajl_gen_status_ok)
345     goto err;
346
347   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
348       yajl_gen_status_ok)
349     goto err;
350
351   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
352     goto err;
353
354   // *** END state change fields ***
355
356   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
357     goto err;
358
359   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
360     goto err;
361
362   *buf = malloc(strlen((char *)buf2) + 1);
363
364   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
365
366   yajl_gen_free(g);
367
368   return 0;
369
370 err:
371   yajl_gen_free(g);
372   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
373   return -1;
374 }
375
376 static int connectivity_link_state(struct nlmsghdr *msg) {
377   int retval = 0;
378   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
379   struct nlattr *attr;
380   const char *dev = NULL;
381
382   pthread_mutex_lock(&connectivity_lock);
383
384   interface_list_t *il;
385
386   /* Scan attribute list for device name. */
387   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
388     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
389       continue;
390
391     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
392       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
393             "mnl_attr_validate "
394             "failed.");
395       pthread_mutex_unlock(&connectivity_lock);
396       return MNL_CB_ERROR;
397     }
398
399     dev = mnl_attr_get_str(attr);
400
401     for (il = interface_list_head; il != NULL; il = il->next)
402       if (strcmp(dev, il->interface) == 0)
403         break;
404
405     if (il == NULL) {
406       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
407             "interface: %s",
408             dev);
409     } else {
410       uint32_t prev_status;
411
412       prev_status = il->status;
413       il->status =
414           ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
415       il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
416
417       // If the new status is different than the previous status,
418       // store the previous status and set sent to zero
419       if (il->status != prev_status) {
420         il->prev_status = prev_status;
421         il->sent = 0;
422       }
423
424       DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
425             il->timestamp, dev,
426             ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
427     }
428
429     // no need to loop again, we found the interface name attr
430     // (otherwise the first if-statement in the loop would
431     // have moved us on with 'continue')
432     break;
433   }
434
435   pthread_mutex_unlock(&connectivity_lock);
436
437   return retval;
438 }
439
440 static int msg_handler(struct nlmsghdr *msg) {
441   switch (msg->nlmsg_type) {
442   case RTM_NEWADDR:
443   case RTM_DELADDR:
444   case RTM_NEWROUTE:
445   case RTM_DELROUTE:
446   case RTM_DELLINK:
447     // Not of interest in current version
448     break;
449   case RTM_NEWLINK:
450     connectivity_link_state(msg);
451     break;
452   default:
453     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
454           msg->nlmsg_type);
455     break;
456   }
457   return 0;
458 }
459
460 static int read_event(struct mnl_socket *nl,
461                       int (*msg_handler)(struct nlmsghdr *)) {
462   int status;
463   int ret = 0;
464   char buf[4096];
465   struct nlmsghdr *h;
466
467   if (nl == NULL)
468     return ret;
469
470   status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
471
472   if (status < 0) {
473     /* Socket non-blocking so bail out once we have read everything */
474     if (errno == EWOULDBLOCK || errno == EAGAIN)
475       return ret;
476
477     /* Anything else is an error */
478     ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
479           status);
480     return status;
481   }
482
483   if (status == 0) {
484     DEBUG("connectivity plugin: read_event: EOF\n");
485   }
486
487   /* We need to handle more than one message per 'recvmsg' */
488   for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
489        h = NLMSG_NEXT(h, status)) {
490     /* Finish reading */
491     if (h->nlmsg_type == NLMSG_DONE)
492       return ret;
493
494     /* Message is some kind of error */
495     if (h->nlmsg_type == NLMSG_ERROR) {
496       ERROR("connectivity plugin: read_event: Message is an error\n");
497       return -1; // Error
498     }
499
500     /* Call message handler */
501     if (msg_handler) {
502       ret = (*msg_handler)(h);
503       if (ret < 0) {
504         ERROR("connectivity plugin: read_event: Message handler error %d\n",
505               ret);
506         return ret;
507       }
508     } else {
509       ERROR("connectivity plugin: read_event: Error NULL message handler\n");
510       return -1;
511     }
512   }
513
514   return ret;
515 }
516
517 static void *connectivity_thread(void *arg) /* {{{ */
518 {
519   pthread_mutex_lock(&connectivity_lock);
520
521   while (connectivity_thread_loop > 0) {
522     int status;
523
524     pthread_mutex_unlock(&connectivity_lock);
525
526     status = read_event(sock, msg_handler);
527
528     pthread_mutex_lock(&connectivity_lock);
529
530     if (status < 0) {
531       connectivity_thread_error = 1;
532       break;
533     }
534
535     if (connectivity_thread_loop <= 0)
536       break;
537   } /* while (connectivity_thread_loop > 0) */
538
539   pthread_mutex_unlock(&connectivity_lock);
540
541   return ((void *)0);
542 } /* }}} void *connectivity_thread */
543
544 static int start_thread(void) /* {{{ */
545 {
546   int status;
547
548   pthread_mutex_lock(&connectivity_lock);
549
550   if (connectivity_thread_loop != 0) {
551     pthread_mutex_unlock(&connectivity_lock);
552     return (0);
553   }
554
555   connectivity_thread_loop = 1;
556   connectivity_thread_error = 0;
557
558   if (sock == NULL) {
559     sock = mnl_socket_open(NETLINK_ROUTE);
560     if (sock == NULL) {
561       ERROR(
562           "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
563       pthread_mutex_unlock(&connectivity_lock);
564       return (-1);
565     }
566
567     if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
568       ERROR(
569           "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
570       pthread_mutex_unlock(&connectivity_lock);
571       return (1);
572     }
573   }
574
575   status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
576                                 connectivity_thread,
577                                 /* arg = */ (void *)0, "connectivity");
578   if (status != 0) {
579     connectivity_thread_loop = 0;
580     ERROR("connectivity plugin: Starting thread failed.");
581     pthread_mutex_unlock(&connectivity_lock);
582     mnl_socket_close(sock);
583     return (-1);
584   }
585
586   pthread_mutex_unlock(&connectivity_lock);
587   return (0);
588 } /* }}} int start_thread */
589
590 static int stop_thread(int shutdown) /* {{{ */
591 {
592   int status;
593
594   if (sock != NULL)
595     mnl_socket_close(sock);
596
597   pthread_mutex_lock(&connectivity_lock);
598
599   if (connectivity_thread_loop == 0) {
600     pthread_mutex_unlock(&connectivity_lock);
601     return (-1);
602   }
603
604   connectivity_thread_loop = 0;
605   pthread_cond_broadcast(&connectivity_cond);
606   pthread_mutex_unlock(&connectivity_lock);
607
608   if (shutdown == 1) {
609     // Since the thread is blocking, calling pthread_join
610     // doesn't actually succeed in stopping it.  It will stick around
611     // until a NETLINK message is received on the socket (at which
612     // it will realize that "connectivity_thread_loop" is 0 and will
613     // break out of the read loop and be allowed to die).  This is
614     // fine when the process isn't supposed to be exiting, but in
615     // the case of a process shutdown, we don't want to have an
616     // idle thread hanging around.  Calling pthread_cancel here in
617     // the case of a shutdown is just assures that the thread is
618     // gone and that the process has been fully terminated.
619
620     DEBUG("connectivity plugin: Canceling thread for process shutdown");
621
622     status = pthread_cancel(connectivity_thread_id);
623
624     if (status != 0) {
625       ERROR("connectivity plugin: Unable to cancel thread: %d", status);
626       status = -1;
627     }
628   } else {
629     status = pthread_join(connectivity_thread_id, /* return = */ NULL);
630     if (status != 0) {
631       ERROR("connectivity plugin: Stopping thread failed.");
632       status = -1;
633     }
634   }
635
636   pthread_mutex_lock(&connectivity_lock);
637   memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
638   connectivity_thread_error = 0;
639   pthread_mutex_unlock(&connectivity_lock);
640
641   DEBUG("connectivity plugin: Finished requesting stop of thread");
642
643   return (status);
644 } /* }}} int stop_thread */
645
646 static int connectivity_init(void) /* {{{ */
647 {
648   if (interface_list_head == NULL) {
649     NOTICE("connectivity plugin: No interfaces have been configured.");
650     return (-1);
651   }
652
653   return (start_thread());
654 } /* }}} int connectivity_init */
655
656 static int connectivity_config(const char *key, const char *value) /* {{{ */
657 {
658   if (strcasecmp(key, "Interface") == 0) {
659     interface_list_t *il;
660     char *interface;
661
662     il = malloc(sizeof(*il));
663     if (il == NULL) {
664       char errbuf[1024];
665       ERROR("connectivity plugin: malloc failed during connectivity_config: %s",
666             sstrerror(errno, errbuf, sizeof(errbuf)));
667       return (1);
668     }
669
670     interface = strdup(value);
671     if (interface == NULL) {
672       char errbuf[1024];
673       sfree(il);
674       ERROR("connectivity plugin: strdup failed connectivity_config: %s",
675             sstrerror(errno, errbuf, sizeof(errbuf)));
676       return (1);
677     }
678
679     il->interface = interface;
680     il->status = LINK_STATE_UNKNOWN;
681     il->prev_status = LINK_STATE_UNKNOWN;
682     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
683     il->sent = 0;
684     il->next = interface_list_head;
685     interface_list_head = il;
686
687   } else {
688     return (-1);
689   }
690
691   return (0);
692 } /* }}} int connectivity_config */
693
694 static void connectivity_dispatch_notification(
695     const char *interface, const char *type, /* {{{ */
696     gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
697   char *buf = NULL;
698   notification_t n = {
699       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
700
701   if (value == LINK_STATE_UP)
702     n.severity = NOTIF_OKAY;
703
704   char hostname[1024];
705   gethostname(hostname, sizeof(hostname));
706
707   sstrncpy(n.host, hostname, sizeof(n.host));
708   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
709   sstrncpy(n.type, "gauge", sizeof(n.type));
710   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
711
712   gen_message_payload(value, old_value, interface, timestamp, &buf);
713
714   notification_meta_t *m = calloc(1, sizeof(*m));
715
716   if (m == NULL) {
717     char errbuf[1024];
718     sfree(buf);
719     ERROR("connectivity plugin: unable to allocate metadata: %s",
720           sstrerror(errno, errbuf, sizeof(errbuf)));
721     return;
722   }
723
724   sstrncpy(m->name, "ves", sizeof(m->name));
725   m->nm_value.nm_string = sstrdup(buf);
726   m->type = NM_TYPE_STRING;
727   n.meta = m;
728
729   DEBUG("connectivity plugin: notification message: %s",
730         n.meta->nm_value.nm_string);
731
732   DEBUG("connectivity plugin: dispatching state %d for interface %s",
733         (int)value, interface);
734
735   plugin_dispatch_notification(&n);
736   plugin_notification_meta_free(n.meta);
737
738   // malloc'd in gen_message_payload
739   if (buf != NULL)
740     sfree(buf);
741 }
742
743 static int connectivity_read(void) /* {{{ */
744 {
745   if (connectivity_thread_error != 0) {
746     ERROR("connectivity plugin: The interface thread had a problem. Restarting "
747           "it.");
748
749     stop_thread(0);
750
751     for (interface_list_t *il = interface_list_head; il != NULL;
752          il = il->next) {
753       il->status = LINK_STATE_UNKNOWN;
754       il->prev_status = LINK_STATE_UNKNOWN;
755       il->sent = 0;
756     }
757
758     start_thread();
759
760     return (-1);
761   } /* if (connectivity_thread_error != 0) */
762
763   for (interface_list_t *il = interface_list_head; il != NULL;
764        il = il->next) /* {{{ */
765   {
766     uint32_t status;
767     uint32_t prev_status;
768     uint32_t sent;
769
770     pthread_mutex_lock(&connectivity_lock);
771
772     status = il->status;
773     prev_status = il->prev_status;
774     sent = il->sent;
775
776     if (status != prev_status && sent == 0) {
777       connectivity_dispatch_notification(il->interface, "gauge", status,
778                                          prev_status, il->timestamp);
779       il->sent = 1;
780     }
781
782     pthread_mutex_unlock(&connectivity_lock);
783   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
784
785   return (0);
786 } /* }}} int connectivity_read */
787
788 static int connectivity_shutdown(void) /* {{{ */
789 {
790   interface_list_t *il;
791
792   DEBUG("connectivity plugin: Shutting down thread.");
793   if (stop_thread(1) < 0)
794     return (-1);
795
796   il = interface_list_head;
797   while (il != NULL) {
798     interface_list_t *il_next;
799
800     il_next = il->next;
801
802     sfree(il->interface);
803     sfree(il);
804
805     il = il_next;
806   }
807
808   return (0);
809 } /* }}} int connectivity_shutdown */
810
811 void module_register(void) {
812   plugin_register_config("connectivity", connectivity_config, config_keys,
813                          config_keys_num);
814   plugin_register_init("connectivity", connectivity_init);
815   plugin_register_read("connectivity", connectivity_read);
816   plugin_register_shutdown("connectivity", connectivity_shutdown);
817 } /* void module_register */