X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=89284c81962ea127d4c901ee0c64e7aceb13275a;hb=3427c2e266c04d67848bda913caa730a395c7295;hp=f0abd44b461a38c5c7fade0b1e96b92f11897ec7;hpb=a898c17330d9a2039bcdb8f7e6dbedba516a6cd8;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index f0abd44b..89284c81 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -1,7 +1,7 @@ /** * collectd - src/amqp.c - * Copyright (C) 2009 Sebastien Pahl - * Copyright (C) 2010 Florian Forster + * Copyright (C) 2009 Sebastien Pahl + * Copyright (C) 2010-2012 Florian Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -178,8 +178,13 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: +#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO if (r.library_errno) return (sstrerror (r.library_errno, buffer, buffer_size)); +#else + if (r.library_error) + return (sstrerror (r.library_error, buffer, buffer_size)); +#endif else sstrncpy (buffer, "End of stream", sizeof (buffer)); break; @@ -216,6 +221,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (buffer); } /* }}} char *camqp_strerror */ +#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ { amqp_exchange_declare_ok_t *ed_ret; @@ -246,6 +252,46 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_create_exchange */ +#else +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + amqp_table_t argument_table; + struct amqp_table_entry_t_ argument_table_entries[1]; + + if (conf->exchange_type == NULL) + return (0); + + /* Valid arguments: "auto_delete", "internal" */ + argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries); + argument_table.entries = argument_table_entries; + argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete"); + argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN; + argument_table_entries[0].value.value.boolean = 1; + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* arguments = */ argument_table); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ +#endif static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ { @@ -316,7 +362,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ /* consumer_tag = */ AMQP_EMPTY_BYTES, /* no_local = */ 0, /* no_ack = */ 1, - /* exclusive = */ 0); + /* exclusive = */ 0, + /* arguments = */ AMQP_EMPTY_TABLE + ); if ((cm_ret == NULL) && camqp_is_error (conf)) { char errbuf[1024]; @@ -550,19 +598,25 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ status = camqp_connect (conf); if (status != 0) { + struct timespec ts_interval; ERROR ("amqp plugin: camqp_connect failed. " - "Will sleep for %i seconds.", interval_g); - sleep (interval_g); + "Will sleep for %.3f seconds.", + CDTIME_T_TO_DOUBLE (interval_g)); + CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + nanosleep (&ts_interval, /* remaining = */ NULL); continue; } status = amqp_simple_wait_frame (conf->connection, &frame); if (status < 0) { + struct timespec ts_interval; ERROR ("amqp plugin: amqp_simple_wait_frame failed. " - "Will sleep for %i seconds.", interval_g); + "Will sleep for %.3f seconds.", + CDTIME_T_TO_DOUBLE (interval_g)); camqp_close_connection (conf); - sleep (interval_g); + CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -587,6 +641,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_free (conf); pthread_exit (NULL); + return (NULL); } /* }}} void *camqp_subscribe_thread */ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */