4 ** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
6 ** Permission is hereby granted, free of charge, to any person
7 ** obtaining a copy of this software and associated documentation
8 ** files (the "Software"), to deal in the Software without
9 ** restriction, including without limitation the rights to use,
10 ** copy, modify, merge, publish, distribute, sublicense, and/or sell
11 ** copies of the Software, and to permit persons to whom the
12 ** Software is furnished to do so, subject to the following
15 ** The above copyright notice and this permission notice shall be
16 ** included in all copies or substantial portions of the Software.
18 ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 ** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 ** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 ** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 ** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 ** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 ** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 ** OTHER DEALINGS IN THE SOFTWARE.
38 #include "utils_format_json.h"
41 #include <amqp_framing.h>
44 static char *host = NULL;
45 static char *vhost = NULL;
46 static char *user = NULL;
47 static char *password = NULL;
48 static char *exchange = NULL;
49 static char *routingkey = NULL;
51 static amqp_connection_state_t amqp_conn = NULL;
52 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
54 static const char *config_keys[] =
65 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
67 static int config_set(char **var, const char *value)
70 if ((*var = strdup(value)) == NULL)
75 static int config(const char *key, const char *value)
77 if (strcasecmp(key, "host") == 0)
78 return (config_set(&host, value));
79 else if(strcasecmp(key, "port") == 0)
83 tmp = service_name_to_port_number (value);
86 ERROR ("AMQP plugin: Cannot parse `%s' as a "
87 "service name (port number).", value);
94 else if (strcasecmp(key, "vhost") == 0)
95 return (config_set(&vhost, value));
96 else if (strcasecmp(key, "user") == 0)
97 return (config_set(&user, value));
98 else if (strcasecmp(key, "password") == 0)
99 return (config_set(&password, value));
100 else if (strcasecmp(key, "exchange") == 0)
101 return (config_set(&exchange, value));
102 else if (strcasecmp(key, "routingkey") == 0)
103 return (config_set(&routingkey, value));
107 static int amqp_write_locked (const char *buffer)
109 amqp_rpc_reply_t reply;
110 amqp_basic_properties_t props;
113 if (amqp_conn == NULL)
117 amqp_conn = amqp_new_connection ();
118 if (amqp_conn == NULL)
120 ERROR ("amqp plugin: amqp_new_connection failed.");
124 sockfd = amqp_open_socket (host, port);
128 status = (-1) * sockfd;
129 ERROR ("amqp plugin: amqp_open_socket failed: %s",
130 sstrerror (status, errbuf, sizeof (errbuf)));
131 amqp_destroy_connection(amqp_conn);
136 amqp_set_sockfd (amqp_conn, sockfd);
138 reply = amqp_login(amqp_conn, vhost,
139 /* channel max = */ 0,
140 /* frame max = */ 131072,
142 /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
143 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
145 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
147 amqp_destroy_connection(amqp_conn);
153 amqp_channel_open (amqp_conn, /* channel = */ 1);
154 /* FIXME: Is checking "reply.reply_type" really correct here? How does
155 * it get set? --octo */
156 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
158 ERROR ("amqp plugin: amqp_channel_open failed.");
159 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
160 amqp_destroy_connection(amqp_conn);
166 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
167 "on %s:%i.", vhost, host, port);
168 } /* if (amqp_conn == NULL) */
170 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
171 props.content_type = amqp_cstring_bytes("application/json");
172 props.delivery_mode = 2; /* persistent delivery mode */
174 status = amqp_basic_publish(amqp_conn,
176 amqp_cstring_bytes(exchange),
177 amqp_cstring_bytes(routingkey),
181 amqp_cstring_bytes(buffer));
186 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
189 sockfd = amqp_get_sockfd (amqp_conn);
190 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
191 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
192 amqp_destroy_connection (amqp_conn);
198 } /* int amqp_write_locked */
200 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
201 __attribute__((unused)) user_data_t *user_data)
208 if ((ds == NULL) || (vl == NULL))
211 memset (buffer, 0, sizeof (buffer));
212 bfree = sizeof (buffer);
215 format_json_initialize(buffer, &bfill, &bfree);
216 /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
217 format_json_value_list(buffer, &bfill, &bfree, ds, vl, /* rates = */ 0);
218 format_json_finalize(buffer, &bfill, &bfree);
220 pthread_mutex_lock (&amqp_conn_lock);
221 status = amqp_write_locked (buffer);
222 pthread_mutex_unlock (&amqp_conn_lock);
225 } /* int amqp_write */
227 static int shutdown(void)
229 pthread_mutex_lock (&amqp_conn_lock);
230 if (amqp_conn != NULL)
234 sockfd = amqp_get_sockfd (amqp_conn);
235 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
236 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
237 amqp_destroy_connection (amqp_conn);
241 pthread_mutex_unlock (&amqp_conn_lock);
253 void module_register(void)
255 plugin_register_config("amqp", config, config_keys, config_keys_num);
256 plugin_register_write("amqp", amqp_write, NULL);
257 plugin_register_shutdown("amqp", shutdown);
260 /* vim: set sw=4 sts=4 et : */