char *ret;
if ((in == NULL) || (in->bytes == NULL))
- return (NULL);
+ return NULL;
ret = malloc(in->len + 1);
if (ret == NULL)
- return (NULL);
+ return NULL;
memcpy(ret, in->bytes, in->len);
ret[in->len] = 0;
- return (ret);
+ return ret;
} /* }}} char *camqp_bytes_cstring */
static _Bool camqp_is_error(camqp_config_t *conf) /* {{{ */
r = amqp_get_rpc_reply(conf->connection);
if (r.reply_type == AMQP_RESPONSE_NORMAL)
- return (0);
+ return 0;
- return (1);
+ return 1;
} /* }}} _Bool camqp_is_error */
static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
if (r.library_errno)
- return (sstrerror(r.library_errno, buffer, buffer_size));
+ return sstrerror(r.library_errno, buffer, buffer_size);
#else
if (r.library_error)
- return (sstrerror(r.library_error, buffer, buffer_size));
+ return sstrerror(r.library_error, buffer, buffer_size);
#endif
else
sstrncpy(buffer, "End of stream", buffer_size);
if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
- ssnprintf(buffer, buffer_size, "Server connection error %d: %s",
- m->reply_code, tmp);
+ snprintf(buffer, buffer_size, "Server connection error %d: %s",
+ m->reply_code, tmp);
sfree(tmp);
} else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
amqp_channel_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
- ssnprintf(buffer, buffer_size, "Server channel error %d: %s",
- m->reply_code, tmp);
+ snprintf(buffer, buffer_size, "Server channel error %d: %s",
+ m->reply_code, tmp);
sfree(tmp);
} else {
- ssnprintf(buffer, buffer_size, "Server error method %#" PRIx32,
- r.reply.id);
+ snprintf(buffer, buffer_size, "Server error method %#" PRIx32,
+ r.reply.id);
}
break;
default:
- ssnprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
+ snprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
}
- return (buffer);
+ return buffer;
} /* }}} char *camqp_strerror */
#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
amqp_exchange_declare_ok_t *ed_ret;
if (conf->exchange_type == NULL)
- return (0);
+ return 0;
ed_ret = amqp_exchange_declare(
conf->connection,
ERROR("amqp plugin: amqp_exchange_declare failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
- return (-1);
+ return -1;
}
INFO("amqp plugin: Successfully created exchange \"%s\" "
"with type \"%s\".",
conf->exchange, conf->exchange_type);
- return (0);
+ return 0;
} /* }}} int camqp_create_exchange */
#else
static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
struct amqp_table_entry_t_ argument_table_entries[1];
if (conf->exchange_type == NULL)
- return (0);
+ return 0;
/* Valid arguments: "auto_delete", "internal" */
argument_table.num_entries = STATIC_ARRAY_SIZE(argument_table_entries);
ERROR("amqp plugin: amqp_exchange_declare failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
- return (-1);
+ return -1;
}
INFO("amqp plugin: Successfully created exchange \"%s\" "
"with type \"%s\".",
conf->exchange, conf->exchange_type);
- return (0);
+ return 0;
} /* }}} int camqp_create_exchange */
#endif
if (qd_ret == NULL) {
ERROR("amqp plugin: amqp_queue_declare failed.");
camqp_close_connection(conf);
- return (-1);
+ return -1;
}
if (conf->queue == NULL) {
if (conf->queue == NULL) {
ERROR("amqp plugin: camqp_bytes_cstring failed.");
camqp_close_connection(conf);
- return (-1);
+ return -1;
}
INFO("amqp plugin: Created queue \"%s\".", conf->queue);
ERROR("amqp plugin: amqp_queue_bind failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
- return (-1);
+ return -1;
}
DEBUG("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
ERROR("amqp plugin: amqp_basic_consume failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
} /* }}} int camqp_setup_queue */
static int camqp_connect(camqp_config_t *conf) /* {{{ */
#endif
if (conf->connection != NULL)
- return (0);
+ return 0;
time_t now = time(NULL);
if (now < (last_connect_time + conf->connection_retry_delay)) {
DEBUG("amqp plugin: skipping connection retry, "
"ConnectionRetryDelay: %d",
conf->connection_retry_delay);
- return (1);
+ return 1;
} else {
DEBUG("amqp plugin: retrying connection");
last_connect_time = now;
conf->connection = amqp_new_connection();
if (conf->connection == NULL) {
ERROR("amqp plugin: amqp_new_connection failed.");
- return (ENOMEM);
+ return ENOMEM;
}
#ifdef HAVE_AMQP_TCP_SOCKET
ERROR("amqp plugin: amqp_tcp_socket_new failed.");
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
- return (ENOMEM);
+ return ENOMEM;
}
status = amqp_socket_open(socket, CONF(conf, host), conf->port);
sstrerror(status, errbuf, sizeof(errbuf)));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
- return (status);
+ return status;
}
#else /* HAVE_AMQP_TCP_SOCKET */
#define CLOSE_SOCKET() close(sockfd)
sstrerror(status, errbuf, sizeof(errbuf)));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
- return (status);
+ return status;
}
amqp_set_sockfd(conf->connection, sockfd);
#endif
amqp_destroy_connection(conf->connection);
CLOSE_SOCKET();
conf->connection = NULL;
- return (1);
+ return 1;
}
amqp_channel_open(conf->connection, /* channel = */ 1);
amqp_destroy_connection(conf->connection);
CLOSE_SOCKET();
conf->connection = NULL;
- return (1);
+ return 1;
}
INFO("amqp plugin: Successfully opened connection to vhost \"%s\" "
status = camqp_create_exchange(conf);
if (status != 0)
- return (status);
+ return status;
if (!conf->publish)
- return (camqp_setup_queue(conf));
- return (0);
+ return camqp_setup_queue(conf);
+ return 0;
} /* }}} int camqp_connect */
static int camqp_shutdown(void) /* {{{ */
DEBUG("amqp plugin: All subscriber threads exited.");
- return (0);
+ return 0;
} /* }}} int camqp_shutdown */
/*
ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
sstrerror(status, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
- return (status);
+ return status;
}
if (frame.frame_type != AMQP_FRAME_BODY) {
NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
- return (-1);
+ return -1;
}
if ((body_size - received) < frame.payload.body_fragment.len) {
WARNING("amqp plugin: Body is larger than indicated by header.");
- return (-1);
+ return -1;
}
memcpy(body_ptr, frame.payload.body_fragment.bytes,
status = cmd_handle_putval(stderr, body);
if (status != 0)
ERROR("amqp plugin: cmd_handle_putval failed with status %i.", status);
- return (status);
+ return status;
} else if (strcasecmp("application/json", content_type) == 0) {
ERROR("amqp plugin: camqp_read_body: Parsing JSON data has not "
"been implemented yet. FIXME!");
- return (0);
+ return 0;
} else {
ERROR("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
content_type);
- return (EINVAL);
+ return EINVAL;
}
/* not reached */
- return (0);
+ return 0;
} /* }}} int camqp_read_body */
static int camqp_read_header(camqp_config_t *conf) /* {{{ */
ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
sstrerror(status, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
- return (status);
+ return status;
}
if (frame.frame_type != AMQP_FRAME_HEADER) {
NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
- return (-1);
+ return -1;
}
properties = frame.payload.properties.decoded;
content_type = camqp_bytes_cstring(&properties->content_type);
if (content_type == NULL) {
ERROR("amqp plugin: Unable to determine content type.");
- return (-1);
+ return -1;
}
status = camqp_read_body(conf, (size_t)frame.payload.properties.body_size,
content_type);
sfree(content_type);
- return (status);
+ return status;
} /* }}} int camqp_read_header */
static void *camqp_subscribe_thread(void *user_data) /* {{{ */
camqp_config_free(conf);
pthread_exit(NULL);
- return (NULL);
+ return NULL;
} /* }}} void *camqp_subscribe_thread */
static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
if (tmp == NULL) {
ERROR("amqp plugin: realloc failed.");
sfree(subscriber_threads);
- return (ENOMEM);
+ return ENOMEM;
}
subscriber_threads = tmp;
tmp = subscriber_threads + subscriber_threads_num;
char errbuf[1024];
ERROR("amqp plugin: pthread_create failed: %s",
sstrerror(status, errbuf, sizeof(errbuf)));
- return (status);
+ return status;
}
subscriber_threads_num++;
- return (0);
+ return 0;
} /* }}} int camqp_subscribe_init */
/*
status = camqp_connect(conf);
if (status != 0)
- return (status);
+ return status;
amqp_basic_properties_t props = {._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
camqp_close_connection(conf);
}
- return (status);
+ return status;
} /* }}} int camqp_write_locked */
static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
int status;
if ((ds == NULL) || (vl == NULL) || (conf == NULL))
- return (EINVAL);
+ return EINVAL;
if (conf->routing_key != NULL) {
sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
} else {
- ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
- vl->host, vl->plugin, vl->plugin_instance, vl->type,
- vl->type_instance);
+ snprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host, vl->plugin, vl->plugin_instance, vl->type,
+ vl->type_instance);
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
if (status != 0) {
ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
- return (status);
+ return status;
}
} else if (conf->format == CAMQP_FORMAT_JSON) {
size_t bfree = sizeof(buffer);
conf->postfix, conf->escape_char, conf->graphite_flags);
if (status != 0) {
ERROR("amqp plugin: format_graphite failed with status %i.", status);
- return (status);
+ return status;
}
} else {
ERROR("amqp plugin: Invalid format (%i).", conf->format);
- return (-1);
+ return -1;
}
pthread_mutex_lock(&conf->lock);
status = camqp_write_locked(conf, buffer, routing_key);
pthread_mutex_unlock(&conf->lock);
- return (status);
+ return status;
} /* }}} int camqp_write */
/*
string = NULL;
status = cf_util_get_string(ci, &string);
if (status != 0)
- return (status);
+ return status;
assert(string != NULL);
if (strcasecmp("Command", string) == 0)
free(string);
- return (0);
+ return 0;
} /* }}} int config_set_string */
static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
conf = calloc(1, sizeof(*conf));
if (conf == NULL) {
ERROR("amqp plugin: calloc failed.");
- return (ENOMEM);
+ return ENOMEM;
}
/* Initialize "conf" {{{ */
status = cf_util_get_string(ci, &conf->name);
if (status != 0) {
sfree(conf);
- return (status);
+ return status;
}
for (int i = 0; i < ci->children_num; i++) {
if (status != 0) {
camqp_config_free(conf);
- return (status);
+ return status;
}
if (conf->exchange != NULL) {
if (publish) {
char cbname[128];
- ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
+ snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
status = plugin_register_write(
cbname, camqp_write, &(user_data_t){
});
if (status != 0) {
camqp_config_free(conf);
- return (status);
+ return status;
}
} else {
status = camqp_subscribe_init(conf);
if (status != 0) {
camqp_config_free(conf);
- return (status);
+ return status;
}
}
- return (0);
+ return 0;
} /* }}} int camqp_config_connection */
static int camqp_config(oconfig_item_t *ci) /* {{{ */
child->key);
} /* for (ci->children_num) */
- return (0);
+ return 0;
} /* }}} int camqp_config */
void module_register(void) {
plugin_register_complex_config("amqp", camqp_config);
plugin_register_shutdown("amqp", camqp_shutdown);
} /* void module_register */
-
-/* vim: set sw=4 sts=4 et fdm=marker : */