cb0ed98d5dd55472ab0001cb9820d2387f31de0d
[collectd.git] / src / connectivity.c
1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27
28 #include "collectd.h"
29
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_complain.h"
33
34 #include <asm/types.h>
35 #include <errno.h>
36 #include <net/if.h>
37 #include <netinet/in.h>
38 #include <pthread.h>
39 #include <stdio.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include <libmnl/libmnl.h>
45 #include <linux/netlink.h>
46 #include <linux/rtnetlink.h>
47
48 #include <yajl/yajl_common.h>
49 #include <yajl/yajl_gen.h>
50 #if HAVE_YAJL_YAJL_VERSION_H
51 #include <yajl/yajl_version.h>
52 #endif
53 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
54 #define HAVE_YAJL_V2 1
55 #endif
56
57 #define MYPROTO NETLINK_ROUTE
58
59 #define LINK_STATE_DOWN 0
60 #define LINK_STATE_UP 1
61 #define LINK_STATE_UNKNOWN 2
62
63 #define CONNECTIVITY_DOMAIN_FIELD "domain"
64 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
65 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
66 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
67 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
68 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
69 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
70 #define CONNECTIVITY_PRIORITY_FIELD "priority"
71 #define CONNECTIVITY_PRIORITY_VALUE "high"
72 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
74 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
75 #define CONNECTIVITY_SEQUENCE_VALUE "0"
76 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
77 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
78 #define CONNECTIVITY_VERSION_FIELD "version"
79 #define CONNECTIVITY_VERSION_VALUE "1.0"
80
81 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
82 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
83 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
84 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
85 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
86 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
87 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
89   "stateChangeFieldsVersion"
90 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
91 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
92
93 /*
94  * Private data types
95  */
96 struct interfacelist_s {
97   char *interface;
98
99   uint32_t status;
100   uint32_t prev_status;
101   uint32_t sent;
102   long long unsigned int timestamp;
103
104   struct interfacelist_s *next;
105 };
106 typedef struct interfacelist_s interfacelist_t;
107
108 /*
109  * Private variables
110  */
111 static interfacelist_t *interfacelist_head = NULL;
112
113 static int connectivity_thread_loop = 0;
114 static int connectivity_thread_error = 0;
115 static pthread_t connectivity_thread_id;
116 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
117 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
118 static struct mnl_socket *sock;
119 static int event_id = 0;
120
121 static const char *config_keys[] = {"Interface"};
122 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
123
124 /*
125  * Private functions
126  */
127
128 static int gen_message_payload(int state, int old_state, const char *interface,
129                                long long unsigned int timestamp, char **buf) {
130   const unsigned char *buf2;
131   yajl_gen g;
132
133 #if !defined(HAVE_YAJL_V2)
134   yajl_gen_config conf = {};
135
136   conf.beautify = 0;
137 #endif
138
139 #if HAVE_YAJL_V2
140   size_t len;
141   g = yajl_gen_alloc(NULL);
142   yajl_gen_config(g, yajl_gen_beautify, 0);
143 #else
144   unsigned int len;
145   g = yajl_gen_alloc(&conf, NULL);
146 #endif
147
148   yajl_gen_clear(g);
149
150   // *** BEGIN common event header ***
151
152   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
153     goto err;
154
155   // domain
156   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
157                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
158     goto err;
159
160   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
161                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
162     goto err;
163
164   // eventId
165   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
166                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
167       yajl_gen_status_ok)
168     goto err;
169
170   event_id = event_id + 1;
171   int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
172   char *event_id_str = malloc(event_id_len);
173   snprintf(event_id_str, event_id_len, "%d", event_id);
174
175   if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
176       yajl_gen_status_ok) {
177     sfree(event_id_str);
178     goto err;
179   }
180
181   sfree(event_id_str);
182
183   // eventName
184   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
185                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
186       yajl_gen_status_ok)
187     goto err;
188
189   int event_name_len = 0;
190   event_name_len = event_name_len + strlen(interface);    // interface name
191   event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
192   event_name_len =
193       event_name_len + 12; // "interface", 2 spaces and null-terminator
194   char *event_name_str = malloc(event_name_len);
195   memset(event_name_str, '\0', event_name_len);
196   snprintf(event_name_str, event_name_len, "interface %s %s", interface,
197            (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
198                        : CONNECTIVITY_EVENT_NAME_UP_VALUE));
199
200   if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
201       yajl_gen_status_ok) {
202     sfree(event_name_str);
203     goto err;
204   }
205
206   sfree(event_name_str);
207
208   // lastEpochMicrosec
209   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
210                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
211       yajl_gen_status_ok)
212     goto err;
213
214   int last_epoch_microsec_len =
215       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
216   char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
217   snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
218            (long long unsigned int)CDTIME_T_TO_US(cdtime()));
219
220   if (yajl_gen_number(g, last_epoch_microsec_str,
221                       strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
222     sfree(last_epoch_microsec_str);
223     goto err;
224   }
225
226   sfree(last_epoch_microsec_str);
227
228   // priority
229   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
230                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
231       yajl_gen_status_ok)
232     goto err;
233
234   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
235                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
236       yajl_gen_status_ok)
237     goto err;
238
239   // reportingEntityName
240   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
241                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
242       yajl_gen_status_ok)
243     goto err;
244
245   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
246                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
247       yajl_gen_status_ok)
248     goto err;
249
250   // sequence
251   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
252                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
253       yajl_gen_status_ok)
254     goto err;
255
256   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
257                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
258       yajl_gen_status_ok)
259     goto err;
260
261   // sourceName
262   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
263                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
264       yajl_gen_status_ok)
265     goto err;
266
267   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
268       yajl_gen_status_ok)
269     goto err;
270
271   // startEpochMicrosec
272   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
273                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
274       yajl_gen_status_ok)
275     goto err;
276
277   int start_epoch_microsec_len =
278       sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
279   char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
280   snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
281            (long long unsigned int)timestamp);
282
283   if (yajl_gen_number(g, start_epoch_microsec_str,
284                       strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
285     sfree(start_epoch_microsec_str);
286     goto err;
287   }
288
289   sfree(start_epoch_microsec_str);
290
291   // version
292   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
293                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
294     goto err;
295
296   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
297                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
298     goto err;
299
300   // *** END common event header ***
301
302   // *** BEGIN state change fields ***
303
304   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
305                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
306       yajl_gen_status_ok)
307     goto err;
308
309   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
310     goto err;
311
312   // newState
313   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
314                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
315       yajl_gen_status_ok)
316     goto err;
317
318   int new_state_len =
319       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
320                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
321
322   if (yajl_gen_string(
323           g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
324                                    : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
325           new_state_len) != yajl_gen_status_ok)
326     goto err;
327
328   // oldState
329   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
330                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
331       yajl_gen_status_ok)
332     goto err;
333
334   int old_state_len =
335       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
336                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
337
338   if (yajl_gen_string(
339           g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
340                                        : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
341           old_state_len) != yajl_gen_status_ok)
342     goto err;
343
344   // stateChangeFieldsVersion
345   if (yajl_gen_string(g,
346                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
347                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
348       yajl_gen_status_ok)
349     goto err;
350
351   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
352                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
353       yajl_gen_status_ok)
354     goto err;
355
356   // stateInterface
357   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
358                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
359       yajl_gen_status_ok)
360     goto err;
361
362   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
363       yajl_gen_status_ok)
364     goto err;
365
366   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
367     goto err;
368
369   // *** END state change fields ***
370
371   if (yajl_gen_map_close(g) != yajl_gen_status_ok)
372     goto err;
373
374   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
375     goto err;
376
377   *buf = malloc(strlen((char *)buf2) + 1);
378
379   sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
380
381   yajl_gen_free(g);
382
383   return 0;
384
385 err:
386   yajl_gen_free(g);
387   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
388   return -1;
389 }
390
391 static int connectivity_link_state(struct nlmsghdr *msg) {
392   int retval = 0;
393   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
394   struct nlattr *attr;
395   const char *dev = NULL;
396
397   pthread_mutex_lock(&connectivity_lock);
398
399   interfacelist_t *il;
400
401   /* Scan attribute list for device name. */
402   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
403     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
404       continue;
405
406     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
407       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
408             "mnl_attr_validate "
409             "failed.");
410       pthread_mutex_unlock(&connectivity_lock);
411       return MNL_CB_ERROR;
412     }
413
414     dev = mnl_attr_get_str(attr);
415
416     for (il = interfacelist_head; il != NULL; il = il->next)
417       if (strcmp(dev, il->interface) == 0)
418         break;
419
420     if (il == NULL) {
421       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
422             "interface: %s",
423             dev);
424     } else {
425       uint32_t prev_status;
426
427       prev_status = il->status;
428       il->status =
429           ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
430       il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
431
432       // If the new status is different than the previous status,
433       // store the previous status and set sent to zero
434       if (il->status != prev_status) {
435         il->prev_status = prev_status;
436         il->sent = 0;
437       }
438
439       DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
440             il->timestamp, dev,
441             ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
442     }
443
444     // no need to loop again, we found the interface name attr
445     // (otherwise the first if-statement in the loop would
446     // have moved us on with 'continue')
447     break;
448   }
449
450   pthread_mutex_unlock(&connectivity_lock);
451
452   return retval;
453 }
454
455 static int msg_handler(struct nlmsghdr *msg) {
456   switch (msg->nlmsg_type) {
457   case RTM_NEWADDR:
458   case RTM_DELADDR:
459   case RTM_NEWROUTE:
460   case RTM_DELROUTE:
461   case RTM_DELLINK:
462     // Not of interest in current version
463     break;
464   case RTM_NEWLINK:
465     connectivity_link_state(msg);
466     break;
467   default:
468     ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
469           msg->nlmsg_type);
470     break;
471   }
472   return 0;
473 }
474
475 static int read_event(struct mnl_socket *nl,
476                       int (*msg_handler)(struct nlmsghdr *)) {
477   int status;
478   int ret = 0;
479   char buf[4096];
480   struct nlmsghdr *h;
481
482   if (nl == NULL)
483     return ret;
484
485   status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
486
487   if (status < 0) {
488     /* Socket non-blocking so bail out once we have read everything */
489     if (errno == EWOULDBLOCK || errno == EAGAIN)
490       return ret;
491
492     /* Anything else is an error */
493     ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
494           status);
495     return status;
496   }
497
498   if (status == 0) {
499     DEBUG("connectivity plugin: read_event: EOF\n");
500   }
501
502   /* We need to handle more than one message per 'recvmsg' */
503   for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
504        h = NLMSG_NEXT(h, status)) {
505     /* Finish reading */
506     if (h->nlmsg_type == NLMSG_DONE)
507       return ret;
508
509     /* Message is some kind of error */
510     if (h->nlmsg_type == NLMSG_ERROR) {
511       ERROR("connectivity plugin: read_event: Message is an error\n");
512       return -1; // Error
513     }
514
515     /* Call message handler */
516     if (msg_handler) {
517       ret = (*msg_handler)(h);
518       if (ret < 0) {
519         ERROR("connectivity plugin: read_event: Message handler error %d\n",
520               ret);
521         return ret;
522       }
523     } else {
524       ERROR("connectivity plugin: read_event: Error NULL message handler\n");
525       return -1;
526     }
527   }
528
529   return ret;
530 }
531
532 static void *connectivity_thread(void *arg) /* {{{ */
533 {
534   pthread_mutex_lock(&connectivity_lock);
535
536   while (connectivity_thread_loop > 0) {
537     int status;
538
539     pthread_mutex_unlock(&connectivity_lock);
540
541     status = read_event(sock, msg_handler);
542
543     pthread_mutex_lock(&connectivity_lock);
544
545     if (status < 0) {
546       connectivity_thread_error = 1;
547       break;
548     }
549
550     if (connectivity_thread_loop <= 0)
551       break;
552   } /* while (connectivity_thread_loop > 0) */
553
554   pthread_mutex_unlock(&connectivity_lock);
555
556   return ((void *)0);
557 } /* }}} void *connectivity_thread */
558
559 static int start_thread(void) /* {{{ */
560 {
561   int status;
562
563   pthread_mutex_lock(&connectivity_lock);
564
565   if (connectivity_thread_loop != 0) {
566     pthread_mutex_unlock(&connectivity_lock);
567     return (0);
568   }
569
570   connectivity_thread_loop = 1;
571   connectivity_thread_error = 0;
572
573   if (sock == NULL) {
574     sock = mnl_socket_open(NETLINK_ROUTE);
575     if (sock == NULL) {
576       ERROR(
577           "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
578       pthread_mutex_unlock(&connectivity_lock);
579       return (-1);
580     }
581
582     if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
583       ERROR(
584           "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
585       pthread_mutex_unlock(&connectivity_lock);
586       return (1);
587     }
588   }
589
590   status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
591                                 connectivity_thread,
592                                 /* arg = */ (void *)0, "connectivity");
593   if (status != 0) {
594     connectivity_thread_loop = 0;
595     ERROR("connectivity plugin: Starting thread failed.");
596     pthread_mutex_unlock(&connectivity_lock);
597     mnl_socket_close(sock);
598     return (-1);
599   }
600
601   pthread_mutex_unlock(&connectivity_lock);
602   return (0);
603 } /* }}} int start_thread */
604
605 static int stop_thread(int shutdown) /* {{{ */
606 {
607   int status;
608
609   if (sock != NULL)
610     mnl_socket_close(sock);
611
612   pthread_mutex_lock(&connectivity_lock);
613
614   if (connectivity_thread_loop == 0) {
615     pthread_mutex_unlock(&connectivity_lock);
616     return (-1);
617   }
618
619   connectivity_thread_loop = 0;
620   pthread_cond_broadcast(&connectivity_cond);
621   pthread_mutex_unlock(&connectivity_lock);
622
623   if (shutdown == 1) {
624     // Since the thread is blocking, calling pthread_join
625     // doesn't actually succeed in stopping it.  It will stick around
626     // until a NETLINK message is received on the socket (at which
627     // it will realize that "connectivity_thread_loop" is 0 and will
628     // break out of the read loop and be allowed to die).  This is
629     // fine when the process isn't supposed to be exiting, but in
630     // the case of a process shutdown, we don't want to have an
631     // idle thread hanging around.  Calling pthread_cancel here in
632     // the case of a shutdown is just assures that the thread is
633     // gone and that the process has been fully terminated.
634
635     DEBUG("connectivity plugin: Canceling thread for process shutdown");
636
637     status = pthread_cancel(connectivity_thread_id);
638
639     if (status != 0) {
640       ERROR("connectivity plugin: Unable to cancel thread: %d", status);
641       status = -1;
642     }
643   } else {
644     status = pthread_join(connectivity_thread_id, /* return = */ NULL);
645     if (status != 0) {
646       ERROR("connectivity plugin: Stopping thread failed.");
647       status = -1;
648     }
649   }
650
651   pthread_mutex_lock(&connectivity_lock);
652   memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
653   connectivity_thread_error = 0;
654   pthread_mutex_unlock(&connectivity_lock);
655
656   DEBUG("connectivity plugin: Finished requesting stop of thread");
657
658   return (status);
659 } /* }}} int stop_thread */
660
661 static int connectivity_init(void) /* {{{ */
662 {
663   if (interfacelist_head == NULL) {
664     NOTICE("connectivity plugin: No interfaces have been configured.");
665     return (-1);
666   }
667
668   return (start_thread());
669 } /* }}} int connectivity_init */
670
671 static int connectivity_config(const char *key, const char *value) /* {{{ */
672 {
673   if (strcasecmp(key, "Interface") == 0) {
674     interfacelist_t *il;
675     char *interface;
676
677     il = malloc(sizeof(*il));
678     if (il == NULL) {
679       char errbuf[1024];
680       ERROR("connectivity plugin: malloc failed during connectivity_config: %s",
681             sstrerror(errno, errbuf, sizeof(errbuf)));
682       return (1);
683     }
684
685     interface = strdup(value);
686     if (interface == NULL) {
687       char errbuf[1024];
688       sfree(il);
689       ERROR("connectivity plugin: strdup failed connectivity_config: %s",
690             sstrerror(errno, errbuf, sizeof(errbuf)));
691       return (1);
692     }
693
694     il->interface = interface;
695     il->status = LINK_STATE_UNKNOWN;
696     il->prev_status = LINK_STATE_UNKNOWN;
697     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
698     il->sent = 0;
699     il->next = interfacelist_head;
700     interfacelist_head = il;
701
702   } else {
703     return (-1);
704   }
705
706   return (0);
707 } /* }}} int connectivity_config */
708
709 static void connectivity_dispatch_notification(
710     const char *interface, const char *type, /* {{{ */
711     gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
712   char *buf = NULL;
713   notification_t n = {
714       NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
715
716   if (value == LINK_STATE_UP)
717     n.severity = NOTIF_OKAY;
718
719   char hostname[1024];
720   gethostname(hostname, sizeof(hostname));
721
722   sstrncpy(n.host, hostname, sizeof(n.host));
723   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
724   sstrncpy(n.type, "gauge", sizeof(n.type));
725   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
726
727   gen_message_payload(value, old_value, interface, timestamp, &buf);
728
729   notification_meta_t *m = calloc(1, sizeof(*m));
730
731   if (m == NULL) {
732     char errbuf[1024];
733     sfree(buf);
734     ERROR("connectivity plugin: unable to allocate metadata: %s",
735           sstrerror(errno, errbuf, sizeof(errbuf)));
736     return;
737   }
738
739   sstrncpy(m->name, "ves", sizeof(m->name));
740   m->nm_value.nm_string = sstrdup(buf);
741   m->type = NM_TYPE_STRING;
742   n.meta = m;
743
744   DEBUG("connectivity plugin: notification message: %s",
745         n.meta->nm_value.nm_string);
746
747   DEBUG("connectivity plugin: dispatching state %d for interface %s",
748         (int)value, interface);
749
750   plugin_dispatch_notification(&n);
751   plugin_notification_meta_free(n.meta);
752
753   // malloc'd in gen_message_payload
754   if (buf != NULL)
755     sfree(buf);
756 }
757
758 static int connectivity_read(void) /* {{{ */
759 {
760   if (connectivity_thread_error != 0) {
761     ERROR("connectivity plugin: The interface thread had a problem. Restarting "
762           "it.");
763
764     stop_thread(0);
765
766     for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) {
767       il->status = LINK_STATE_UNKNOWN;
768       il->prev_status = LINK_STATE_UNKNOWN;
769       il->sent = 0;
770     }
771
772     start_thread();
773
774     return (-1);
775   } /* if (connectivity_thread_error != 0) */
776
777   for (interfacelist_t *il = interfacelist_head; il != NULL;
778        il = il->next) /* {{{ */
779   {
780     uint32_t status;
781     uint32_t prev_status;
782     uint32_t sent;
783
784     pthread_mutex_lock(&connectivity_lock);
785
786     status = il->status;
787     prev_status = il->prev_status;
788     sent = il->sent;
789
790     if (status != prev_status && sent == 0) {
791       connectivity_dispatch_notification(il->interface, "gauge", status,
792                                          prev_status, il->timestamp);
793       il->sent = 1;
794     }
795
796     pthread_mutex_unlock(&connectivity_lock);
797   } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */
798
799   return (0);
800 } /* }}} int connectivity_read */
801
802 static int connectivity_shutdown(void) /* {{{ */
803 {
804   interfacelist_t *il;
805
806   DEBUG("connectivity plugin: Shutting down thread.");
807   if (stop_thread(1) < 0)
808     return (-1);
809
810   il = interfacelist_head;
811   while (il != NULL) {
812     interfacelist_t *il_next;
813
814     il_next = il->next;
815
816     sfree(il->interface);
817     sfree(il);
818
819     il = il_next;
820   }
821
822   return (0);
823 } /* }}} int connectivity_shutdown */
824
825 void module_register(void) {
826   plugin_register_config("connectivity", connectivity_config, config_keys,
827                          config_keys_num);
828   plugin_register_init("connectivity", connectivity_init);
829   plugin_register_read("connectivity", connectivity_read);
830   plugin_register_shutdown("connectivity", connectivity_shutdown);
831 } /* void module_register */