X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_riemann.c;h=a404ff6e19470f10f52f8926d89b7f5225c037e4;hb=9bf68f1807313f29cad9f68873c8feab21ad4746;hp=15bb23787c59f7b4f01d543c621a75fee2be42f4;hpb=38e2d0eff79464e4b9686e135a184411d9307db7;p=collectd.git diff --git a/src/write_riemann.c b/src/write_riemann.c index 15bb2378..a404ff6e 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -37,6 +37,7 @@ #define RIEMANN_HOST "localhost" #define RIEMANN_PORT "5555" +#define RIEMANN_TTL_FACTOR 2.0 struct riemann_host { char *name; @@ -49,6 +50,7 @@ struct riemann_host { char *service; _Bool use_tcp; int s; + double ttl_factor; int reference_count; }; @@ -84,7 +86,7 @@ static void riemann_event_protobuf_free (Event *event) /* {{{ */ sfree (event); } /* }}} void riemann_event_protobuf_free */ -static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */ +static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */ { size_t i; @@ -104,8 +106,7 @@ static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */ } /* }}} void riemann_msg_protobuf_free */ /* host->lock must be held when calling this function. */ -static int -riemann_connect(struct riemann_host *host) +static int riemann_connect(struct riemann_host *host) /* {{{ */ { int e; struct addrinfo *ai, *res, hints; @@ -160,11 +161,10 @@ riemann_connect(struct riemann_host *host) return -1; } return 0; -} +} /* }}} int riemann_connect */ /* host->lock must be held when calling this function. */ -static int -riemann_disconnect (struct riemann_host *host) +static int riemann_disconnect (struct riemann_host *host) /* {{{ */ { if ((host->flags & F_CONNECT) == 0) return (0); @@ -174,31 +174,25 @@ riemann_disconnect (struct riemann_host *host) host->flags &= ~F_CONNECT; return (0); -} +} /* }}} int riemann_disconnect */ -static int -riemann_send(struct riemann_host *host, Msg const *msg) +static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */ { - u_char *buffer; + int status = 0; + u_char *buffer = NULL; size_t buffer_len; - int status; - - pthread_mutex_lock (&host->lock); status = riemann_connect (host); if (status != 0) - { - pthread_mutex_unlock (&host->lock); return status; - } buffer_len = msg__get_packed_size(msg); + if (host->use_tcp) buffer_len += 4; buffer = malloc (buffer_len); if (buffer == NULL) { - pthread_mutex_unlock (&host->lock); ERROR ("write_riemann plugin: malloc failed."); return ENOMEM; } @@ -219,10 +213,6 @@ riemann_send(struct riemann_host *host, Msg const *msg) if (status != 0) { char errbuf[1024]; - - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s", (host->node != NULL) ? host->node : RIEMANN_HOST, (host->service != NULL) ? host->service : RIEMANN_PORT, @@ -231,17 +221,94 @@ riemann_send(struct riemann_host *host, Msg const *msg) return -1; } - pthread_mutex_unlock (&host->lock); sfree (buffer); return 0; -} +} /* }}} int riemann_send_msg */ + +static int riemann_recv_ack(struct riemann_host *host) /* {{{ */ +{ + int status = 0; + Msg *msg = NULL; + uint32_t header; + + status = (int) sread (host->s, &header, 4); + + if (status != 0) + return -1; + + size_t size = ntohl(header); + + // Buffer on the stack since acknowledges are typically small. + u_char buffer[size]; + memset (buffer, 0, size); + + status = (int) sread (host->s, buffer, size); + + if (status != 0) + return status; + + msg = msg__unpack (NULL, size, buffer); + + if (msg == NULL) + return -1; + + if (!msg->ok) + { + ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s", + (host->node != NULL) ? host->node : RIEMANN_HOST, + (host->service != NULL) ? host->service : RIEMANN_PORT, + msg->error); + + msg__free_unpacked(msg, NULL); + return -1; + } + + msg__free_unpacked (msg, NULL); + return 0; +} /* }}} int riemann_recv_ack */ + +/** + * Function to send messages (Msg) to riemann. + * + * Acquires the host lock, disconnects on errors. + */ +static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */ +{ + int status = 0; + pthread_mutex_lock (&host->lock); + + status = riemann_send_msg(host, msg); + if (status != 0) { + riemann_disconnect (host); + pthread_mutex_unlock (&host->lock); + return status; + } + + /* + * For TCP we need to receive message acknowledgemenent. + */ + if (host->use_tcp) + { + status = riemann_recv_ack(host); + + if (status != 0) + { + riemann_disconnect (host); + pthread_mutex_unlock (&host->lock); + return status; + } + } + + pthread_mutex_unlock (&host->lock); + return 0; +} /* }}} int riemann_send */ static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */ { return (strarray_add (&event->tags, &event->n_tags, tag)); } /* }}} int riemann_event_add_tag */ -static int riemann_event_add_attribute (Event *event, /* {{{ */ +static int riemann_event_add_attribute(Event *event, /* {{{ */ char const *key, char const *value) { Attribute **new_attributes; @@ -274,7 +341,7 @@ static int riemann_event_add_attribute (Event *event, /* {{{ */ return (0); } /* }}} int riemann_event_add_attribute */ -static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */ +static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */ notification_t const *n) { Msg *msg; @@ -368,7 +435,7 @@ static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ return (msg); } /* }}} Msg *riemann_notification_to_protobuf */ -static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */ +static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */ data_set_t const *ds, value_list_t const *vl, size_t index, gauge_t const *rates) @@ -376,6 +443,7 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ Event *event; char name_buffer[5 * DATA_MAX_NAME_LEN]; char service_buffer[6 * DATA_MAX_NAME_LEN]; + double ttl; int i; event = malloc (sizeof (*event)); @@ -390,7 +458,9 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ event->host = strdup (vl->host); event->time = CDTIME_T_TO_TIME_T (vl->time); event->has_time = 1; - event->ttl = CDTIME_T_TO_TIME_T (2 * vl->interval); + + ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor; + event->ttl = (float) ttl; event->has_ttl = 1; riemann_event_add_attribute (event, "plugin", vl->plugin); @@ -466,7 +536,7 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ return (event); } /* }}} Event *riemann_value_to_protobuf */ -static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ +static Msg *riemann_value_list_to_protobuf(struct riemann_host const *host, /* {{{ */ data_set_t const *ds, value_list_t const *vl) { @@ -521,8 +591,7 @@ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* return (msg); } /* }}} Msg *riemann_value_list_to_protobuf */ -static int -riemann_notification(const notification_t *n, user_data_t *ud) +static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */ { int status; struct riemann_host *host = ud->data; @@ -541,8 +610,7 @@ riemann_notification(const notification_t *n, user_data_t *ud) return (status); } /* }}} int riemann_notification */ -static int -riemann_write(const data_set_t *ds, +static int riemann_write(const data_set_t *ds, /* {{{ */ const value_list_t *vl, user_data_t *ud) { @@ -561,10 +629,9 @@ riemann_write(const data_set_t *ds, riemann_msg_protobuf_free (msg); return status; -} +} /* }}} int riemann_write */ -static void -riemann_free(void *p) +static void riemann_free(void *p) /* {{{ */ { struct riemann_host *host = p; @@ -585,10 +652,9 @@ riemann_free(void *p) sfree(host->service); pthread_mutex_destroy (&host->lock); sfree(host); -} +} /* }}} void riemann_free */ -static int -riemann_config_node(oconfig_item_t *ci) +static int riemann_config_node(oconfig_item_t *ci) /* {{{ */ { struct riemann_host *host = NULL; int status = 0; @@ -608,6 +674,7 @@ riemann_config_node(oconfig_item_t *ci) host->store_rates = 1; host->always_append_ds = 0; host->use_tcp = 0; + host->ttl_factor = RIEMANN_TTL_FACTOR; status = cf_util_get_string (ci, &host->name); if (status != 0) { @@ -667,6 +734,33 @@ riemann_config_node(oconfig_item_t *ci) &host->always_append_ds); if (status != 0) break; + } else if (strcasecmp ("TTLFactor", child->key) == 0) { + double tmp = NAN; + status = cf_util_get_double (child, &tmp); + if (status != 0) + break; + if (tmp >= 2.0) { + host->ttl_factor = tmp; + } else if (tmp >= 1.0) { + NOTICE ("write_riemann plugin: The configured " + "TTLFactor is very small " + "(%.1f). A value of 2.0 or " + "greater is recommended.", + tmp); + host->ttl_factor = tmp; + } else if (tmp > 0.0) { + WARNING ("write_riemann plugin: The configured " + "TTLFactor is too small to be " + "useful (%.1f). I'll use it " + "since the user knows best, " + "but under protest.", + tmp); + host->ttl_factor = tmp; + } else { /* zero, negative and NAN */ + ERROR ("write_riemann plugin: The configured " + "TTLFactor is invalid (%.1f).", + tmp); + } } else { WARNING("write_riemann plugin: ignoring unknown config " "option: \"%s\"", child->key); @@ -716,10 +810,9 @@ riemann_config_node(oconfig_item_t *ci) pthread_mutex_unlock (&host->lock); return status; -} +} /* }}} int riemann_config_node */ -static int -riemann_config(oconfig_item_t *ci) +static int riemann_config(oconfig_item_t *ci) /* {{{ */ { int i; oconfig_item_t *child; @@ -746,10 +839,9 @@ riemann_config(oconfig_item_t *ci) } } return (0); -} +} /* }}} int riemann_config */ -void -module_register(void) +void module_register(void) { plugin_register_complex_config ("write_riemann", riemann_config); }