X-Git-Url: https://git.octo.it/?a=blobdiff_plain;ds=sidebyside;f=src%2Fwrite_riemann.c;h=780bccbefb2c794cf59f33cfc6f0fe3717d68496;hb=633c3966f770e4d46651a2fe219a18d8a9907a9f;hp=3a7738f626705b7bbdbf966895b751fdf46fada1;hpb=4c6303ec6be673df6c9e0964dfc9419c697bf47c;p=collectd.git diff --git a/src/write_riemann.c b/src/write_riemann.c index 3a7738f6..780bccbe 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -39,11 +39,15 @@ #define RIEMANN_PORT "5555" #define RIEMANN_TTL_FACTOR 2.0 +int write_riemann_threshold_check(const data_set_t *, const value_list_t *, int *); + struct riemann_host { char *name; #define F_CONNECT 0x01 uint8_t flags; pthread_mutex_t lock; + _Bool notifications; + _Bool check_thresholds; _Bool store_rates; _Bool always_append_ds; char *node; @@ -108,8 +112,7 @@ static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */ } /* }}} void riemann_msg_protobuf_free */ /* host->lock must be held when calling this function. */ -static int -riemann_connect(struct riemann_host *host) +static int riemann_connect(struct riemann_host *host) /* {{{ */ { int e; struct addrinfo *ai, *res, hints; @@ -151,7 +154,7 @@ riemann_connect(struct riemann_host *host) } host->flags |= F_CONNECT; - DEBUG("write_riemann plugin: got a succesful connection for: %s:%s", + DEBUG("write_riemann plugin: got a successful connection for: %s:%s", node, service); break; } @@ -164,11 +167,10 @@ riemann_connect(struct riemann_host *host) return -1; } return 0; -} +} /* }}} int riemann_connect */ /* host->lock must be held when calling this function. */ -static int -riemann_disconnect (struct riemann_host *host) +static int riemann_disconnect (struct riemann_host *host) /* {{{ */ { if ((host->flags & F_CONNECT) == 0) return (0); @@ -178,17 +180,15 @@ riemann_disconnect (struct riemann_host *host) host->flags &= ~F_CONNECT; return (0); -} +} /* }}} int riemann_disconnect */ -static inline int -riemann_send_msg(struct riemann_host *host, const Msg *msg) +static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */ { int status = 0; u_char *buffer = NULL; size_t buffer_len; status = riemann_connect (host); - if (status != 0) return status; @@ -198,12 +198,10 @@ riemann_send_msg(struct riemann_host *host, const Msg *msg) buffer_len += 4; buffer = malloc (buffer_len); - if (buffer == NULL) { ERROR ("write_riemann plugin: malloc failed."); return ENOMEM; } - memset (buffer, 0, buffer_len); if (host->use_tcp) @@ -218,26 +216,22 @@ riemann_send_msg(struct riemann_host *host, const Msg *msg) } status = (int) swrite (host->s, buffer, buffer_len); - if (status != 0) { char errbuf[1024]; - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s", (host->node != NULL) ? host->node : RIEMANN_HOST, (host->service != NULL) ? host->service : RIEMANN_PORT, sstrerror (errno, errbuf, sizeof (errbuf))); - sfree (buffer); return -1; } sfree (buffer); return 0; -} +} /* }}} int riemann_send_msg */ -static inline int -riemann_recv_ack(struct riemann_host *host) +static int riemann_recv_ack(struct riemann_host *host) /* {{{ */ { int status = 0; Msg *msg = NULL; @@ -277,21 +271,19 @@ riemann_recv_ack(struct riemann_host *host) msg__free_unpacked (msg, NULL); return 0; -} +} /* }}} int riemann_recv_ack */ /** * Function to send messages (Msg) to riemann. * * Acquires the host lock, disconnects on errors. */ -static int -riemann_send(struct riemann_host *host, Msg const *msg) +static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */ { int status = 0; pthread_mutex_lock (&host->lock); status = riemann_send_msg(host, msg); - if (status != 0) { riemann_disconnect (host); pthread_mutex_unlock (&host->lock); @@ -315,7 +307,7 @@ riemann_send(struct riemann_host *host, Msg const *msg) pthread_mutex_unlock (&host->lock); return 0; -} +} /* }}} int riemann_send */ static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */ { @@ -437,6 +429,9 @@ static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ n->type, n->type_instance); event->service = strdup (&service_buffer[1]); + if (n->message[0] != 0) + riemann_event_add_attribute (event, "description", n->message); + /* Pull in values from threshold and add extra attributes */ for (meta = n->meta; meta != NULL; meta = meta->next) { @@ -462,7 +457,8 @@ static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */ data_set_t const *ds, value_list_t const *vl, size_t index, - gauge_t const *rates) + gauge_t const *rates, + int status) { Event *event; char name_buffer[5 * DATA_MAX_NAME_LEN]; @@ -483,6 +479,23 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ event->time = CDTIME_T_TO_TIME_T (vl->time); event->has_time = 1; + if (host->check_thresholds) { + switch (status) { + case STATE_OKAY: + event->state = strdup("ok"); + break; + case STATE_ERROR: + event->state = strdup("critical"); + break; + case STATE_WARNING: + event->state = strdup("warning"); + break; + case STATE_MISSING: + event->state = strdup("unknown"); + break; + } + } + ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor; event->ttl = (float) ttl; event->has_ttl = 1; @@ -566,8 +579,9 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ } /* }}} Event *riemann_value_to_protobuf */ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl) + data_set_t const *ds, + value_list_t const *vl, + int *statuses) { Msg *msg; size_t i; @@ -607,7 +621,7 @@ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* for (i = 0; i < msg->n_events; i++) { msg->events[i] = riemann_value_to_protobuf (host, ds, vl, - (int) i, rates); + (int) i, rates, statuses[i]); if (msg->events[i] == NULL) { riemann_msg_protobuf_free (msg); @@ -620,13 +634,15 @@ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* return (msg); } /* }}} Msg *riemann_value_list_to_protobuf */ -static int -riemann_notification(const notification_t *n, user_data_t *ud) +static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */ { int status; struct riemann_host *host = ud->data; Msg *msg; + if (!host->notifications) + return 0; + msg = riemann_notification_to_protobuf (host, n); if (msg == NULL) return (-1); @@ -640,16 +656,18 @@ riemann_notification(const notification_t *n, user_data_t *ud) return (status); } /* }}} int riemann_notification */ -static int -riemann_write(const data_set_t *ds, +static int riemann_write(const data_set_t *ds, /* {{{ */ const value_list_t *vl, user_data_t *ud) { int status; + int statuses[vl->values_len]; struct riemann_host *host = ud->data; Msg *msg; - msg = riemann_value_list_to_protobuf (host, ds, vl); + if (host->check_thresholds) + write_riemann_threshold_check(ds, vl, statuses); + msg = riemann_value_list_to_protobuf (host, ds, vl, statuses); if (msg == NULL) return (-1); @@ -660,10 +678,9 @@ riemann_write(const data_set_t *ds, riemann_msg_protobuf_free (msg); return status; -} +} /* }}} int riemann_write */ -static void -riemann_free(void *p) +static void riemann_free(void *p) /* {{{ */ { struct riemann_host *host = p; @@ -684,10 +701,9 @@ riemann_free(void *p) sfree(host->service); pthread_mutex_destroy (&host->lock); sfree(host); -} +} /* }}} void riemann_free */ -static int -riemann_config_node(oconfig_item_t *ci) +static int riemann_config_node(oconfig_item_t *ci) /* {{{ */ { struct riemann_host *host = NULL; int status = 0; @@ -704,6 +720,8 @@ riemann_config_node(oconfig_item_t *ci) host->reference_count = 1; host->node = NULL; host->service = NULL; + host->notifications = 1; + host->check_thresholds = 0; host->store_rates = 1; host->always_append_ds = 0; host->use_tcp = 0; @@ -728,6 +746,14 @@ riemann_config_node(oconfig_item_t *ci) status = cf_util_get_string (child, &host->node); if (status != 0) break; + } else if (strcasecmp ("Notifications", child->key) == 0) { + status = cf_util_get_boolean(child, &host->notifications); + if (status != 0) + break; + } else if (strcasecmp ("CheckThresholds", child->key) == 0) { + status = cf_util_get_boolean(child, &host->check_thresholds); + if (status != 0) + break; } else if (strcasecmp ("Port", child->key) == 0) { status = cf_util_get_service (child, &host->service); if (status != 0) { @@ -843,10 +869,9 @@ riemann_config_node(oconfig_item_t *ci) pthread_mutex_unlock (&host->lock); return status; -} +} /* }}} int riemann_config_node */ -static int -riemann_config(oconfig_item_t *ci) +static int riemann_config(oconfig_item_t *ci) /* {{{ */ { int i; oconfig_item_t *child; @@ -898,11 +923,10 @@ riemann_config(oconfig_item_t *ci) child->key); } } - return (0); -} + return 0; +} /* }}} int riemann_config */ -void -module_register(void) +void module_register(void) { plugin_register_complex_config ("write_riemann", riemann_config); }