2 * collectd - src/sysevent.c
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
24 * Andrew Bays <abays at redhat.com>
31 #include "utils_complain.h"
33 #include <asm/types.h>
36 #include <netinet/in.h>
41 #include <sys/socket.h>
43 #include <yajl/yajl_tree.h>
45 #define SYSEVENT_REGEX_MATCHES 1
58 struct regexfilterlist_s {
60 regex_t regex_filter_obj;
62 struct regexfilterlist_s *next;
64 typedef struct regexfilterlist_s regexfilterlist_t;
70 static int sysevent_thread_loop = 0;
71 static int sysevent_thread_error = 0;
72 static pthread_t sysevent_thread_id;
73 static pthread_mutex_t sysevent_lock = PTHREAD_MUTEX_INITIALIZER;
75 static circbuf_t ring;
77 static char *listen_ip;
78 static char *listen_port;
79 static int listen_buffer_size = 1024;
80 static int buffer_length = 10;
82 static regexfilterlist_t *regexfilterlist_head = NULL;
84 static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
85 static const char *rsyslog_field_keys[4] = {"facility", "severity", "program",
92 static void *sysevent_thread(void *arg) /* {{{ */
94 pthread_mutex_lock(&sysevent_lock);
96 while (sysevent_thread_loop > 0) {
99 pthread_mutex_unlock(&sysevent_lock);
104 char buffer[listen_buffer_size];
105 struct sockaddr_storage src_addr;
106 socklen_t src_addr_len = sizeof(src_addr);
108 memset(buffer, '\0', listen_buffer_size);
110 ssize_t count = recvfrom(sock, buffer, sizeof(buffer), 0,
111 (struct sockaddr *)&src_addr, &src_addr_len);
114 ERROR("sysevent plugin: failed to receive data: %s", strerror(errno));
116 } else if (count >= sizeof(buffer)) {
117 WARNING("sysevent plugin: datagram too large for buffer: truncated");
120 // 2. Push to buffer if there is room, otherwise raise warning
122 pthread_mutex_lock(&sysevent_lock);
124 int next = ring.head + 1;
125 if (next >= ring.maxLen)
128 if (next == ring.tail) {
129 WARNING("sysevent plugin: ring buffer full");
131 DEBUG("sysevent plugin: writing %s", buffer);
133 strncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
137 pthread_mutex_unlock(&sysevent_lock);
142 pthread_mutex_lock(&sysevent_lock);
145 WARNING("sysevent plugin: problem with thread status: %d", status);
146 sysevent_thread_error = 1;
150 if (sysevent_thread_loop <= 0)
152 } /* while (sysevent_thread_loop > 0) */
154 pthread_mutex_unlock(&sysevent_lock);
156 // pthread_exit instead of return
158 } /* }}} void *sysevent_thread */
160 static int start_thread(void) /* {{{ */
164 pthread_mutex_lock(&sysevent_lock);
166 if (sysevent_thread_loop != 0) {
167 pthread_mutex_unlock(&sysevent_lock);
171 sysevent_thread_loop = 1;
172 sysevent_thread_error = 0;
174 DEBUG("sysevent plugin: starting thread");
176 status = plugin_thread_create(&sysevent_thread_id, /* attr = */ NULL,
178 /* arg = */ (void *)0, "sysevent");
180 sysevent_thread_loop = 0;
181 ERROR("sysevent plugin: starting thread failed.");
182 pthread_mutex_unlock(&sysevent_lock);
186 pthread_mutex_unlock(&sysevent_lock);
188 } /* }}} int start_thread */
190 static int stop_thread(int shutdown) /* {{{ */
194 pthread_mutex_lock(&sysevent_lock);
196 if (sysevent_thread_loop == 0) {
197 pthread_mutex_unlock(&sysevent_lock);
201 sysevent_thread_loop = 0;
202 pthread_mutex_unlock(&sysevent_lock);
205 // Since the thread is blocking, calling pthread_join
206 // doesn't actually succeed in stopping it. It will stick around
207 // until a message is received on the socket (at which
208 // it will realize that "sysevent_thread_loop" is 0 and will
209 // break out of the read loop and be allowed to die). This is
210 // fine when the process isn't supposed to be exiting, but in
211 // the case of a process shutdown, we don't want to have an
212 // idle thread hanging around. Calling pthread_cancel here in
213 // the case of a shutdown is just assures that the thread is
214 // gone and that the process has been fully terminated.
216 DEBUG("sysevent plugin: Canceling thread for process shutdown");
218 status = pthread_cancel(sysevent_thread_id);
221 ERROR("sysevent plugin: Unable to cancel thread: %d (%s)", status,
226 status = pthread_join(sysevent_thread_id, /* return = */ NULL);
228 ERROR("sysevent plugin: Stopping thread failed.");
233 pthread_mutex_lock(&sysevent_lock);
234 memset(&sysevent_thread_id, 0, sizeof(sysevent_thread_id));
235 sysevent_thread_error = 0;
236 pthread_mutex_unlock(&sysevent_lock);
238 DEBUG("sysevent plugin: Finished requesting stop of thread");
241 } /* }}} int stop_thread */
243 static int sysevent_init(void) /* {{{ */
247 ring.maxLen = buffer_length;
248 ring.buffer = (char **)malloc(buffer_length * sizeof(char *));
250 for (int i = 0; i < buffer_length; i++) {
251 ring.buffer[i] = malloc(listen_buffer_size);
255 const char *hostname = listen_ip;
256 const char *portname = listen_port;
257 struct addrinfo hints;
258 memset(&hints, 0, sizeof(hints));
259 hints.ai_family = AF_UNSPEC;
260 hints.ai_socktype = SOCK_DGRAM;
261 hints.ai_protocol = 0;
262 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
263 struct addrinfo *res = 0;
265 int err = getaddrinfo(hostname, portname, &hints, &res);
268 ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
274 sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
276 ERROR("sysevent plugin: failed to open socket: %s", strerror(errno));
281 if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
282 ERROR("sysevent plugin: failed to bind socket: %s", strerror(errno));
290 DEBUG("sysevent plugin: socket created and bound");
292 return (start_thread());
293 } /* }}} int sysevent_init */
295 static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
297 if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
298 ci->values[1].type != OCONFIG_TYPE_STRING) {
299 ERROR("sysevent plugin: The `%s' config option needs "
300 "two string arguments (ip and port).",
305 listen_ip = strdup(ci->values[0].value.string);
306 listen_port = strdup(ci->values[1].value.string);
311 static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
315 if (cf_util_get_int(ci, &tmp) != 0)
317 else if ((tmp >= 1024) && (tmp <= 65535))
318 listen_buffer_size = tmp;
321 "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
328 static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
332 if (cf_util_get_int(ci, &tmp) != 0)
334 else if ((tmp >= 3) && (tmp <= 1024))
337 WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 1024.");
344 static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
346 if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
347 ERROR("sysevent plugin: The `%s' config option needs "
348 "one string argument, a regular expression.",
353 regexfilterlist_t *rl;
358 regexp_str = strdup(ci->values[0].value.string);
360 status = regcomp(®exp, regexp_str, REG_EXTENDED);
363 ERROR("sysevent plugin: 'RegexFilter' invalid regular expression: %s",
368 rl = malloc(sizeof(*rl));
371 ERROR("sysevent plugin: malloc failed during "
372 "sysevent_config_add_regex_filter: %s",
373 sstrerror(errno, errbuf, sizeof(errbuf)));
377 rl->regex_filter = regexp_str;
378 rl->regex_filter_obj = regexp;
379 rl->next = regexfilterlist_head;
380 regexfilterlist_head = rl;
385 static int sysevent_config(oconfig_item_t *ci) /* {{{ */
387 for (int i = 0; i < ci->children_num; i++) {
388 oconfig_item_t *child = ci->children + i;
390 if (strcasecmp("Listen", child->key) == 0)
391 sysevent_config_add_listen(child);
392 else if (strcasecmp("BufferSize", child->key) == 0)
393 sysevent_config_add_buffer_size(child);
394 else if (strcasecmp("BufferLength", child->key) == 0)
395 sysevent_config_add_buffer_length(child);
396 else if (strcasecmp("RegexFilter", child->key) == 0)
397 sysevent_config_add_regex_filter(child);
399 WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
404 } /* }}} int sysevent_config */
407 static void submit(const char *message, yajl_val *node,
408 const char *type, /* {{{ */
410 value_list_t vl = VALUE_LIST_INIT;
412 vl.values = &(value_t){.gauge = value};
414 sstrncpy(vl.plugin, "sysevent", sizeof(vl.plugin));
415 sstrncpy(vl.type, type, sizeof(vl.type));
417 // Create metadata to store JSON key-values
418 meta_data_t *meta = meta_data_create();
421 // If we have a parsed-JSON node to work with, use that
424 for (i = 0; i < sizeof(rsyslog_keys) / sizeof(*rsyslog_keys); i++) {
425 char json_val[listen_buffer_size];
426 const char *key = (const char *)rsyslog_keys[i];
427 const char *path[] = {key, (const char *)0};
428 yajl_val v = yajl_tree_get(*node, path, yajl_t_string);
430 memset(json_val, '\0', listen_buffer_size);
432 sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0');
434 DEBUG("sysevent plugin: adding jsonval: %s", json_val);
436 meta_data_add_string(meta, rsyslog_keys[i], json_val);
439 for (i = 0; i < sizeof(rsyslog_field_keys) / sizeof(*rsyslog_field_keys);
441 char json_val[listen_buffer_size];
442 const char *key = (const char *)rsyslog_field_keys[i];
443 const char *path[] = {"@fields", key, (const char *)0};
444 yajl_val v = yajl_tree_get(*node, path, yajl_t_string);
446 memset(json_val, '\0', listen_buffer_size);
448 sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0');
450 DEBUG("sysevent plugin: adding jsonval: %s", json_val);
452 meta_data_add_string(meta, rsyslog_field_keys[i], json_val);
455 // Data was not sent in JSON format, so just treat the whole log entry
457 meta_data_add_string(meta, "@message", strdup(message));
462 DEBUG("sysevent plugin: dispatching message");
464 plugin_dispatch_values(&vl);
465 } /* }}} void sysevent_submit */
467 static int sysevent_read(void) /* {{{ */
469 if (sysevent_thread_error != 0) {
470 ERROR("sysevent plugin: The sysevent thread had a problem (%d). Restarting "
472 sysevent_thread_error);
479 } /* if (sysevent_thread_error != 0) */
481 pthread_mutex_lock(&sysevent_lock);
483 while (ring.head != ring.tail) {
485 char *match_str = NULL;
486 regexfilterlist_t *rl = regexfilterlist_head;
487 int next = ring.tail + 1;
489 if (next >= ring.maxLen)
492 DEBUG("sysevent plugin: reading %s", ring.buffer[ring.tail]);
494 // Try to parse JSON, and if it fails, fall back to plain string
500 node = yajl_tree_parse((const char *)ring.buffer[ring.tail], errbuf,
506 // If we have any regex filters, we need to see if the message portion of
507 // the data matches any of them (otherwise we're not interested)
508 if (regexfilterlist_head != NULL) {
509 char json_val[listen_buffer_size];
510 const char *path[] = {"@message", (const char *)0};
511 yajl_val v = yajl_tree_get(node, path, yajl_t_string);
513 memset(json_val, '\0', listen_buffer_size);
515 sprintf(json_val, "%s%c", YAJL_GET_STRING(v), '\0');
517 match_str = (char *)&json_val;
520 // non-JSON rsyslog data
522 // If we have any regex filters, we need to see if the message data
523 // matches any of them (otherwise we're not interested)
524 if (regexfilterlist_head != NULL)
525 match_str = ring.buffer[ring.tail];
528 // If we care about matching, do that comparison here
529 if (match_str != NULL) {
533 regmatch_t matches[SYSEVENT_REGEX_MATCHES];
535 is_match = (regexec(&rl->regex_filter_obj, match_str,
536 SYSEVENT_REGEX_MATCHES, matches, 0) == 0
541 DEBUG("sysevent plugin: regex filter match: %s", rl->regex_filter);
549 if (is_match == 1 && node != NULL)
550 submit(NULL, &node, "gauge", 1);
551 else if (is_match == 1)
552 submit(ring.buffer[ring.tail], NULL, "gauge", 1);
555 yajl_tree_free(node);
560 pthread_mutex_unlock(&sysevent_lock);
563 } /* }}} int sysevent_read */
565 static int sysevent_shutdown(void) /* {{{ */
568 regexfilterlist_t *rl;
570 DEBUG("sysevent plugin: Shutting down thread.");
571 if (stop_thread(1) < 0)
575 status = close(sock);
577 ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
587 for (int i = 0; i < buffer_length; i++) {
588 free(ring.buffer[i]);
593 rl = regexfilterlist_head;
595 regexfilterlist_t *rl_next;
599 free(rl->regex_filter);
600 regfree(&rl->regex_filter_obj);
608 } /* }}} int sysevent_shutdown */
610 void module_register(void) {
611 plugin_register_complex_config("sysevent", sysevent_config);
612 plugin_register_init("sysevent", sysevent_init);
613 plugin_register_read("sysevent", sysevent_read);
614 plugin_register_shutdown("sysevent", sysevent_shutdown);
615 } /* void module_register */