Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / write_mongodb.c
1 /**
2  * collectd - src/write_mongodb.c
3  * Copyright (C) 2010-2013  Florian Forster
4  * Copyright (C) 2010       Akkarit Sangpetch
5  * Copyright (C) 2012       Chris Lundquist
6  * Copyright (C) 2017       Saikrishna Arcot
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a
9  * copy of this software and associated documentation files (the "Software"),
10  * to deal in the Software without restriction, including without limitation
11  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
12  * and/or sell copies of the Software, and to permit persons to whom the
13  * Software is furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24  * DEALINGS IN THE SOFTWARE.
25  *
26  * Authors:
27  *   Florian Forster <octo at collectd.org>
28  *   Akkarit Sangpetch <asangpet at andrew.cmu.edu>
29  *   Chris Lundquist <clundquist at bluebox.net>
30  *   Saikrishna Arcot <saiarcot895 at gmail.com>
31  **/
32
33 #include "collectd.h"
34
35 #include "common.h"
36 #include "plugin.h"
37 #include "utils_cache.h"
38
39 #include <mongoc.h>
40
41 struct wm_node_s {
42   char name[DATA_MAX_NAME_LEN];
43
44   char *host;
45   int port;
46   int timeout;
47
48   /* Authentication information */
49   char *db;
50   char *user;
51   char *passwd;
52
53   _Bool store_rates;
54   _Bool connected;
55
56   mongoc_client_t *client;
57   mongoc_database_t *database;
58   pthread_mutex_t lock;
59 };
60 typedef struct wm_node_s wm_node_t;
61
62 /*
63  * Functions
64  */
65 static bson_t *wm_create_bson(const data_set_t *ds, /* {{{ */
66                               const value_list_t *vl, _Bool store_rates) {
67   bson_t *ret;
68   bson_t subarray;
69   gauge_t *rates;
70
71   ret = bson_new();
72   if (!ret) {
73     ERROR("write_mongodb plugin: bson_new failed.");
74     return NULL;
75   }
76
77   if (store_rates) {
78     rates = uc_get_rate(ds, vl);
79     if (rates == NULL) {
80       ERROR("write_mongodb plugin: uc_get_rate() failed.");
81       bson_destroy(ret);
82       return NULL;
83     }
84   } else {
85     rates = NULL;
86   }
87
88   BSON_APPEND_DATE_TIME(ret, "timestamp", CDTIME_T_TO_MS(vl->time));
89   BSON_APPEND_UTF8(ret, "host", vl->host);
90   BSON_APPEND_UTF8(ret, "plugin", vl->plugin);
91   BSON_APPEND_UTF8(ret, "plugin_instance", vl->plugin_instance);
92   BSON_APPEND_UTF8(ret, "type", vl->type);
93   BSON_APPEND_UTF8(ret, "type_instance", vl->type_instance);
94
95   BSON_APPEND_ARRAY_BEGIN(ret, "values", &subarray); /* {{{ */
96   for (size_t i = 0; i < ds->ds_num; i++) {
97     char key[16];
98
99     snprintf(key, sizeof(key), "%zu", i);
100
101     if (ds->ds[i].type == DS_TYPE_GAUGE)
102       BSON_APPEND_DOUBLE(&subarray, key, vl->values[i].gauge);
103     else if (store_rates)
104       BSON_APPEND_DOUBLE(&subarray, key, (double)rates[i]);
105     else if (ds->ds[i].type == DS_TYPE_COUNTER)
106       BSON_APPEND_INT64(&subarray, key, vl->values[i].counter);
107     else if (ds->ds[i].type == DS_TYPE_DERIVE)
108       BSON_APPEND_INT64(&subarray, key, vl->values[i].derive);
109     else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
110       BSON_APPEND_INT64(&subarray, key, vl->values[i].absolute);
111     else {
112       ERROR("write_mongodb plugin: Unknown ds_type %d for index %zu",
113             ds->ds[i].type, i);
114       bson_destroy(ret);
115       return NULL;
116     }
117   }
118   bson_append_array_end(ret, &subarray); /* }}} values */
119
120   BSON_APPEND_ARRAY_BEGIN(ret, "dstypes", &subarray); /* {{{ */
121   for (size_t i = 0; i < ds->ds_num; i++) {
122     char key[16];
123
124     snprintf(key, sizeof(key), "%zu", i);
125
126     if (store_rates)
127       BSON_APPEND_UTF8(&subarray, key, "gauge");
128     else
129       BSON_APPEND_UTF8(&subarray, key, DS_TYPE_TO_STRING(ds->ds[i].type));
130   }
131   bson_append_array_end(ret, &subarray); /* }}} dstypes */
132
133   BSON_APPEND_ARRAY_BEGIN(ret, "dsnames", &subarray); /* {{{ */
134   for (size_t i = 0; i < ds->ds_num; i++) {
135     char key[16];
136
137     snprintf(key, sizeof(key), "%zu", i);
138     BSON_APPEND_UTF8(&subarray, key, ds->ds[i].name);
139   }
140   bson_append_array_end(ret, &subarray); /* }}} dsnames */
141
142   sfree(rates);
143
144   size_t error_location;
145   if (!bson_validate(ret, BSON_VALIDATE_UTF8, &error_location)) {
146     ERROR("write_mongodb plugin: Error in generated BSON document "
147           "at byte %zu",
148           error_location);
149     bson_destroy(ret);
150     return NULL;
151   }
152
153   return ret;
154 } /* }}} bson *wm_create_bson */
155
156 static int wm_initialize(wm_node_t *node) /* {{{ */
157 {
158   char *uri;
159
160   if (node->connected)
161     return 0;
162
163   INFO("write_mongodb plugin: Connecting to [%s]:%d", node->host, node->port);
164
165   if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) {
166     uri = ssnprintf_alloc("mongodb://%s:%s@%s:%d/?authSource=%s", node->user,
167                           node->passwd, node->host, node->port, node->db);
168     if (uri == NULL) {
169       ERROR("write_mongodb plugin: Not enough memory to assemble "
170             "authentication string.");
171       mongoc_client_destroy(node->client);
172       node->client = NULL;
173       node->connected = 0;
174       return -1;
175     }
176
177     node->client = mongoc_client_new(uri);
178     if (!node->client) {
179       ERROR("write_mongodb plugin: Authenticating to [%s]:%d for database "
180             "\"%s\" as user \"%s\" failed.",
181             node->host, node->port, node->db, node->user);
182       node->connected = 0;
183       sfree(uri);
184       return -1;
185     }
186   } else {
187     uri = ssnprintf_alloc("mongodb://%s:%d", node->host, node->port);
188     if (uri == NULL) {
189       ERROR("write_mongodb plugin: Not enough memory to assemble "
190             "authentication string.");
191       mongoc_client_destroy(node->client);
192       node->client = NULL;
193       node->connected = 0;
194       return -1;
195     }
196
197     node->client = mongoc_client_new(uri);
198     if (!node->client) {
199       ERROR("write_mongodb plugin: Connecting to [%s]:%d failed.", node->host,
200             node->port);
201       node->connected = 0;
202       sfree(uri);
203       return -1;
204     }
205     sfree(uri);
206   }
207
208   node->database = mongoc_client_get_database(node->client, "collectd");
209   if (!node->database) {
210     ERROR("write_mongodb plugin: error creating/getting database");
211     mongoc_client_destroy(node->client);
212     node->client = NULL;
213     node->connected = 0;
214     return -1;
215   }
216
217   node->connected = 1;
218   return 0;
219 } /* }}} int wm_initialize */
220
221 static int wm_write(const data_set_t *ds, /* {{{ */
222                     const value_list_t *vl, user_data_t *ud) {
223   wm_node_t *node = ud->data;
224   mongoc_collection_t *collection = NULL;
225   bson_t *bson_record;
226   bson_error_t error;
227   int status;
228
229   bson_record = wm_create_bson(ds, vl, node->store_rates);
230   if (!bson_record) {
231     ERROR("write_mongodb plugin: error making insert bson");
232     return -1;
233   }
234
235   pthread_mutex_lock(&node->lock);
236   if (wm_initialize(node) < 0) {
237     ERROR("write_mongodb plugin: error making connection to server");
238     pthread_mutex_unlock(&node->lock);
239     bson_destroy(bson_record);
240     return -1;
241   }
242
243   collection =
244       mongoc_client_get_collection(node->client, "collectd", vl->plugin);
245   if (!collection) {
246     ERROR("write_mongodb plugin: error creating/getting collection");
247     mongoc_database_destroy(node->database);
248     mongoc_client_destroy(node->client);
249     node->database = NULL;
250     node->client = NULL;
251     node->connected = 0;
252     pthread_mutex_unlock(&node->lock);
253     bson_destroy(bson_record);
254     return -1;
255   }
256
257   status = mongoc_collection_insert(collection, MONGOC_INSERT_NONE, bson_record,
258                                     NULL, &error);
259
260   if (!status) {
261     ERROR("write_mongodb plugin: error inserting record: %s", error.message);
262     mongoc_database_destroy(node->database);
263     mongoc_client_destroy(node->client);
264     node->database = NULL;
265     node->client = NULL;
266     node->connected = 0;
267     pthread_mutex_unlock(&node->lock);
268     bson_destroy(bson_record);
269     mongoc_collection_destroy(collection);
270     return -1;
271   }
272
273   /* free our resource as not to leak memory */
274   mongoc_collection_destroy(collection);
275
276   pthread_mutex_unlock(&node->lock);
277
278   bson_destroy(bson_record);
279
280   return 0;
281 } /* }}} int wm_write */
282
283 static void wm_config_free(void *ptr) /* {{{ */
284 {
285   wm_node_t *node = ptr;
286
287   if (node == NULL)
288     return;
289
290   mongoc_database_destroy(node->database);
291   mongoc_client_destroy(node->client);
292   node->database = NULL;
293   node->client = NULL;
294   node->connected = 0;
295
296   sfree(node->host);
297   sfree(node);
298 } /* }}} void wm_config_free */
299
300 static int wm_config_node(oconfig_item_t *ci) /* {{{ */
301 {
302   wm_node_t *node;
303   int status;
304
305   node = calloc(1, sizeof(*node));
306   if (node == NULL)
307     return ENOMEM;
308   mongoc_init();
309   node->host = strdup("localhost");
310   if (node->host == NULL) {
311     sfree(node);
312     return ENOMEM;
313   }
314   node->port = MONGOC_DEFAULT_PORT;
315   node->store_rates = 1;
316   pthread_mutex_init(&node->lock, /* attr = */ NULL);
317
318   status = cf_util_get_string_buffer(ci, node->name, sizeof(node->name));
319
320   if (status != 0) {
321     sfree(node->host);
322     sfree(node);
323     return status;
324   }
325
326   for (int i = 0; i < ci->children_num; i++) {
327     oconfig_item_t *child = ci->children + i;
328
329     if (strcasecmp("Host", child->key) == 0)
330       status = cf_util_get_string(child, &node->host);
331     else if (strcasecmp("Port", child->key) == 0) {
332       status = cf_util_get_port_number(child);
333       if (status > 0) {
334         node->port = status;
335         status = 0;
336       }
337     } else if (strcasecmp("Timeout", child->key) == 0)
338       status = cf_util_get_int(child, &node->timeout);
339     else if (strcasecmp("StoreRates", child->key) == 0)
340       status = cf_util_get_boolean(child, &node->store_rates);
341     else if (strcasecmp("Database", child->key) == 0)
342       status = cf_util_get_string(child, &node->db);
343     else if (strcasecmp("User", child->key) == 0)
344       status = cf_util_get_string(child, &node->user);
345     else if (strcasecmp("Password", child->key) == 0)
346       status = cf_util_get_string(child, &node->passwd);
347     else
348       WARNING("write_mongodb plugin: Ignoring unknown config option \"%s\".",
349               child->key);
350
351     if (status != 0)
352       break;
353   } /* for (i = 0; i < ci->children_num; i++) */
354
355   if ((node->db != NULL) || (node->user != NULL) || (node->passwd != NULL)) {
356     if ((node->db == NULL) || (node->user == NULL) || (node->passwd == NULL)) {
357       WARNING(
358           "write_mongodb plugin: Authentication requires the "
359           "\"Database\", \"User\" and \"Password\" options to be specified, "
360           "but at last one of them is missing. Authentication will NOT be "
361           "used.");
362       sfree(node->db);
363       sfree(node->user);
364       sfree(node->passwd);
365     }
366   }
367
368   if (status == 0) {
369     char cb_name[sizeof("write_mongodb/") + DATA_MAX_NAME_LEN];
370
371     snprintf(cb_name, sizeof(cb_name), "write_mongodb/%s", node->name);
372
373     status =
374         plugin_register_write(cb_name, wm_write,
375                               &(user_data_t){
376                                   .data = node, .free_func = wm_config_free,
377                               });
378     INFO("write_mongodb plugin: registered write plugin %s %d", cb_name,
379          status);
380   }
381
382   if (status != 0)
383     wm_config_free(node);
384
385   return status;
386 } /* }}} int wm_config_node */
387
388 static int wm_config(oconfig_item_t *ci) /* {{{ */
389 {
390   for (int i = 0; i < ci->children_num; i++) {
391     oconfig_item_t *child = ci->children + i;
392
393     if (strcasecmp("Node", child->key) == 0)
394       wm_config_node(child);
395     else
396       WARNING("write_mongodb plugin: Ignoring unknown "
397               "configuration option \"%s\" at top level.",
398               child->key);
399   }
400
401   return 0;
402 } /* }}} int wm_config */
403
404 void module_register(void) {
405   plugin_register_complex_config("write_mongodb", wm_config);
406 }