Merge branch 'collectd-5.7'
[collectd.git] / src / write_mongodb.c
1 /**
2  * collectd - src/write_mongodb.c
3  * Copyright (C) 2010-2013  Florian Forster
4  * Copyright (C) 2010       Akkarit Sangpetch
5  * Copyright (C) 2012       Chris Lundquist
6  * Copyright (C) 2017       Saikrishna Arcot
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a
9  * copy of this software and associated documentation files (the "Software"),
10  * to deal in the Software without restriction, including without limitation
11  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12  * and/or sell copies of the Software, and to permit persons to whom the
13  * Software is furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24  * DEALINGS IN THE SOFTWARE.
25  *
26  * Authors:
27  *   Florian Forster <octo at collectd.org>
28  *   Akkarit Sangpetch <asangpet at andrew.cmu.edu>
29  *   Chris Lundquist <clundquist at bluebox.net>
30  *   Saikrishna Arcot <saiarcot895 at gmail.com>
31  **/
32
33 #include "collectd.h"
34
35 #include "common.h"
36 #include "plugin.h"
37 #include "utils_cache.h"
38
39 #include <mongoc.h>
40
41 struct wm_node_s {
42   char name[DATA_MAX_NAME_LEN];
43
44   char *host;
45   int port;
46   int timeout;
47
48   /* Authentication information */
49   char *db;
50   char *user;
51   char *passwd;
52
53   _Bool store_rates;
54   _Bool connected;
55
56   mongoc_client_t *client;
57   mongoc_database_t *database;
58   pthread_mutex_t lock;
59 };
60 typedef struct wm_node_s wm_node_t;
61
62 /*
63  * Functions
64  */
65 static bson_t *wm_create_bson(const data_set_t *ds, /* {{{ */
66                               const value_list_t *vl, _Bool store_rates) {
67   bson_t *ret;
68   bson_t subarray;
69   gauge_t *rates;
70
71   ret = bson_new();
72   if (!ret) {
73     ERROR("write_mongodb plugin: bson_new failed.");
74     return NULL;
75   }
76
77   if (store_rates) {
78     rates = uc_get_rate(ds, vl);
79     if (rates == NULL) {
80       ERROR("write_mongodb plugin: uc_get_rate() failed.");
81       bson_free(ret);
82       return NULL;
83     }
84   } else {
85     rates = NULL;
86   }
87
88   BSON_APPEND_DATE_TIME(ret, "timestamp", CDTIME_T_TO_MS(vl->time));
89   BSON_APPEND_UTF8(ret, "host", vl->host);
90   BSON_APPEND_UTF8(ret, "plugin", vl->plugin);
91   BSON_APPEND_UTF8(ret, "plugin_instance", vl->plugin_instance);
92   BSON_APPEND_UTF8(ret, "type", vl->type);
93   BSON_APPEND_UTF8(ret, "type_instance", vl->type_instance);
94
95   BSON_APPEND_ARRAY_BEGIN(ret, "values", &subarray); /* {{{ */
96   for (int i = 0; i < ds->ds_num; i++) {
97     char key[16];
98
99     ssnprintf(key, sizeof(key), "%i", i);
100
101     if (ds->ds[i].type == DS_TYPE_GAUGE)
102       BSON_APPEND_DOUBLE(&subarray, key, vl->values[i].gauge);
103     else if (store_rates)
104       BSON_APPEND_DOUBLE(&subarray, key, (double)rates[i]);
105     else if (ds->ds[i].type == DS_TYPE_COUNTER)
106       BSON_APPEND_INT64(&subarray, key, vl->values[i].counter);
107     else if (ds->ds[i].type == DS_TYPE_DERIVE)
108       BSON_APPEND_INT64(&subarray, key, vl->values[i].derive);
109     else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
110       BSON_APPEND_INT64(&subarray, key, vl->values[i].absolute);
111     else {
112       ERROR("write_mongodb plugin: Unknown ds_type %d for index %d",
113             ds->ds[i].type, i);
114       bson_free(ret);
115       return NULL;
116     }
117   }
118   bson_append_array_end(ret, &subarray); /* }}} values */
119
120   BSON_APPEND_ARRAY_BEGIN(ret, "dstypes", &subarray); /* {{{ */
121   for (int i = 0; i < ds->ds_num; i++) {
122     char key[16];
123
124     ssnprintf(key, sizeof(key), "%i", i);
125
126     if (store_rates)
127       BSON_APPEND_UTF8(&subarray, key, "gauge");
128     else
129       BSON_APPEND_UTF8(&subarray, key, DS_TYPE_TO_STRING(ds->ds[i].type));
130   }
131   bson_append_array_end(ret, &subarray); /* }}} dstypes */
132
133   BSON_APPEND_ARRAY_BEGIN(ret, "dsnames", &subarray); /* {{{ */
134   for (int i = 0; i < ds->ds_num; i++) {
135     char key[16];
136
137     ssnprintf(key, sizeof(key), "%i", i);
138     BSON_APPEND_UTF8(&subarray, key, ds->ds[i].name);
139   }
140   bson_append_array_end(ret, &subarray); /* }}} dsnames */
141
142   sfree(rates);
143
144   size_t error_location;
145   if (!bson_validate(ret, BSON_VALIDATE_UTF8, &error_location)) {
146     ERROR("write_mongodb plugin: Error in generated BSON document "
147         "at byte %zu", error_location);
148     bson_free(ret);
149     return NULL;
150   }
151
152   return ret;
153 } /* }}} bson *wm_create_bson */
154
155 static int wm_initialize(wm_node_t *node) /* {{{ */
156 {
157   char *uri;
158   size_t uri_length;
159   char const *format_string;
160
161   if (node->connected) {
162     return 0;
163   }
164
165   INFO("write_mongodb plugin: Connecting to [%s]:%i",
166        (node->host != NULL) ? node->host : "localhost",
167        (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT);
168
169   if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) {
170     format_string = "mongodb://%s:%s@%s:%d/?authSource=%s";
171     uri_length = strlen(format_string) + strlen(node->user) +
172                  strlen(node->passwd) + strlen(node->host) + 5 +
173                  strlen(node->db) + 1;
174     if ((uri = calloc(1, uri_length)) == NULL) {
175       ERROR("write_mongodb plugin: Not enough memory to assemble "
176             "authentication string.");
177       mongoc_client_destroy(node->client);
178       node->client = NULL;
179       node->connected = 0;
180       return -1;
181     }
182     ssnprintf(uri, uri_length, format_string, node->user, node->passwd,
183               node->host, node->port, node->db);
184
185     node->client = mongoc_client_new(uri);
186     if (!node->client) {
187       ERROR("write_mongodb plugin: Authenticating to [%s]%i for database "
188             "\"%s\" as user \"%s\" failed.",
189             (node->host != NULL) ? node->host : "localhost",
190             (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT, node->db,
191             node->user);
192       node->connected = 0;
193       sfree(uri);
194       return -1;
195     }
196   } else {
197     format_string = "mongodb://%s:%d";
198     uri_length = strlen(format_string) + strlen(node->host) + 5 + 1;
199     if ((uri = calloc(1, uri_length)) == NULL) {
200       ERROR("write_mongodb plugin: Not enough memory to assemble "
201             "authentication string.");
202       mongoc_client_destroy(node->client);
203       node->client = NULL;
204       node->connected = 0;
205       return -1;
206     }
207     snprintf(uri, uri_length, format_string, node->host, node->port);
208
209     node->client = mongoc_client_new(uri);
210     if (!node->client) {
211       ERROR("write_mongodb plugin: Connecting to [%s]:%i failed.",
212             (node->host != NULL) ? node->host : "localhost",
213             (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT);
214       node->connected = 0;
215       sfree(uri);
216       return -1;
217     }
218   }
219   sfree(uri);
220
221   node->database = mongoc_client_get_database(node->client, "collectd");
222   if (!node->database) {
223     ERROR("write_mongodb plugin: error creating/getting database");
224     mongoc_client_destroy(node->client);
225     node->client = NULL;
226     node->connected = 0;
227     return -1;
228   }
229
230   node->connected = 1;
231   return 0;
232 } /* }}} int wm_initialize */
233
234 static int wm_write(const data_set_t *ds, /* {{{ */
235                     const value_list_t *vl, user_data_t *ud) {
236   wm_node_t *node = ud->data;
237   mongoc_collection_t *collection = NULL;
238   bson_t *bson_record;
239   bson_error_t error;
240   int status;
241
242   bson_record = wm_create_bson(ds, vl, node->store_rates);
243   if (!bson_record) {
244     ERROR("write_mongodb plugin: error making insert bson");
245     return -1;
246   }
247
248   pthread_mutex_lock(&node->lock);
249   if (wm_initialize(node) < 0) {
250     ERROR("write_mongodb plugin: error making connection to server");
251     pthread_mutex_unlock(&node->lock);
252     bson_free(bson_record);
253     return -1;
254   }
255
256   collection =
257       mongoc_client_get_collection(node->client, "collectd", vl->plugin);
258   if (!collection) {
259     ERROR("write_mongodb plugin: error creating/getting collection");
260     mongoc_database_destroy(node->database);
261     mongoc_client_destroy(node->client);
262     node->database = NULL;
263     node->client = NULL;
264     node->connected = 0;
265     pthread_mutex_unlock(&node->lock);
266     bson_free(bson_record);
267     return -1;
268   }
269
270   status = mongoc_collection_insert(collection, MONGOC_INSERT_NONE, bson_record,
271                                     NULL, &error);
272
273   if (!status) {
274     ERROR("write_mongodb plugin: error inserting record: %s", error.message);
275     mongoc_database_destroy(node->database);
276     mongoc_client_destroy(node->client);
277     node->database = NULL;
278     node->client = NULL;
279     node->connected = 0;
280     pthread_mutex_unlock(&node->lock);
281     bson_free(bson_record);
282     mongoc_collection_destroy(collection);
283     return -1;
284   }
285
286   /* free our resource as not to leak memory */
287   mongoc_collection_destroy(collection);
288
289   pthread_mutex_unlock(&node->lock);
290
291   bson_free(bson_record);
292
293   return 0;
294 } /* }}} int wm_write */
295
296 static void wm_config_free(void *ptr) /* {{{ */
297 {
298   wm_node_t *node = ptr;
299
300   if (node == NULL)
301     return;
302
303   mongoc_database_destroy(node->database);
304   mongoc_client_destroy(node->client);
305   node->database = NULL;
306   node->client = NULL;
307   node->connected = 0;
308
309   sfree(node->host);
310   sfree(node);
311 } /* }}} void wm_config_free */
312
313 static int wm_config_node(oconfig_item_t *ci) /* {{{ */
314 {
315   wm_node_t *node;
316   int status;
317
318   node = calloc(1, sizeof(*node));
319   if (node == NULL)
320     return (ENOMEM);
321   mongoc_init();
322   node->host = NULL;
323   node->store_rates = 1;
324   pthread_mutex_init(&node->lock, /* attr = */ NULL);
325
326   status = cf_util_get_string_buffer(ci, node->name, sizeof(node->name));
327
328   if (status != 0) {
329     sfree(node);
330     return (status);
331   }
332
333   for (int i = 0; i < ci->children_num; i++) {
334     oconfig_item_t *child = ci->children + i;
335
336     if (strcasecmp("Host", child->key) == 0)
337       status = cf_util_get_string(child, &node->host);
338     else if (strcasecmp("Port", child->key) == 0) {
339       status = cf_util_get_port_number(child);
340       if (status > 0) {
341         node->port = status;
342         status = 0;
343       }
344     } else if (strcasecmp("Timeout", child->key) == 0)
345       status = cf_util_get_int(child, &node->timeout);
346     else if (strcasecmp("StoreRates", child->key) == 0)
347       status = cf_util_get_boolean(child, &node->store_rates);
348     else if (strcasecmp("Database", child->key) == 0)
349       status = cf_util_get_string(child, &node->db);
350     else if (strcasecmp("User", child->key) == 0)
351       status = cf_util_get_string(child, &node->user);
352     else if (strcasecmp("Password", child->key) == 0)
353       status = cf_util_get_string(child, &node->passwd);
354     else
355       WARNING("write_mongodb plugin: Ignoring unknown config option \"%s\".",
356               child->key);
357
358     if (status != 0)
359       break;
360   } /* for (i = 0; i < ci->children_num; i++) */
361
362   if ((node->db != NULL) || (node->user != NULL) || (node->passwd != NULL)) {
363     if ((node->db == NULL) || (node->user == NULL) || (node->passwd == NULL)) {
364       WARNING(
365           "write_mongodb plugin: Authentication requires the "
366           "\"Database\", \"User\" and \"Password\" options to be specified, "
367           "but at last one of them is missing. Authentication will NOT be "
368           "used.");
369       sfree(node->db);
370       sfree(node->user);
371       sfree(node->passwd);
372     }
373   }
374
375   if (status == 0) {
376     char cb_name[DATA_MAX_NAME_LEN];
377
378     ssnprintf(cb_name, sizeof(cb_name), "write_mongodb/%s", node->name);
379
380     status = plugin_register_write(
381         cb_name, wm_write, &(user_data_t){
382                                .data = node, .free_func = wm_config_free,
383                            });
384     INFO("write_mongodb plugin: registered write plugin %s %d", cb_name,
385          status);
386   }
387
388   if (status != 0)
389     wm_config_free(node);
390
391   return (status);
392 } /* }}} int wm_config_node */
393
394 static int wm_config(oconfig_item_t *ci) /* {{{ */
395 {
396   for (int i = 0; i < ci->children_num; i++) {
397     oconfig_item_t *child = ci->children + i;
398
399     if (strcasecmp("Node", child->key) == 0)
400       wm_config_node(child);
401     else
402       WARNING("write_mongodb plugin: Ignoring unknown "
403               "configuration option \"%s\" at top level.",
404               child->key);
405   }
406
407   return (0);
408 } /* }}} int wm_config */
409
410 void module_register(void) {
411   plugin_register_complex_config("write_mongodb", wm_config);
412 }