postgresql plugin: Added support for <Result> blocks.
[collectd.git] / src / postgresql.c
1 /**
2  * collectd - src/postgresql.c
3  * Copyright (C) 2008  Sebastian Harl
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Sebastian Harl <sh at tokkee.org>
20  **/
21
22 /*
23  * This module collects PostgreSQL database statistics.
24  */
25
26 #include "collectd.h"
27 #include "common.h"
28
29 #include "configfile.h"
30 #include "plugin.h"
31
32 #include "utils_complain.h"
33
34 #include <pg_config_manual.h>
35 #include <libpq-fe.h>
36
37 #define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
38 #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
39 #define log_info(...) INFO ("postgresql: " __VA_ARGS__)
40
41 #ifndef C_PSQL_DEFAULT_CONF
42 # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
43 #endif
44
45 /* Appends the (parameter, value) pair to the string
46  * pointed to by 'buf' suitable to be used as argument
47  * for PQconnectdb(). If value equals NULL, the pair
48  * is ignored. */
49 #define C_PSQL_PAR_APPEND(buf, buf_len, parameter, value) \
50         if ((0 < (buf_len)) && (NULL != (value)) && ('\0' != *(value))) { \
51                 int s = ssnprintf (buf, buf_len, " %s = '%s'", parameter, value); \
52                 if (0 < s) { \
53                         buf     += s; \
54                         buf_len -= s; \
55                 } \
56         }
57
58 /* Returns the tuple (major, minor, patchlevel)
59  * for the given version number. */
60 #define C_PSQL_SERVER_VERSION3(server_version) \
61         (server_version) / 10000, \
62         (server_version) / 100 - (int)((server_version) / 10000) * 100, \
63         (server_version) - (int)((server_version) / 100) * 100
64
65 /* Returns true if the given host specifies a
66  * UNIX domain socket. */
67 #define C_PSQL_IS_UNIX_DOMAIN_SOCKET(host) \
68         ((NULL == (host)) || ('\0' == *(host)) || ('/' == *(host)))
69
70 /* Returns the tuple (host, delimiter, port) for a
71  * given (host, port) pair. Depending on the value of
72  * 'host' a UNIX domain socket or a TCP socket is
73  * assumed. */
74 #define C_PSQL_SOCKET3(host, port) \
75         ((NULL == (host)) || ('\0' == *(host))) ? DEFAULT_PGSOCKET_DIR : host, \
76         C_PSQL_IS_UNIX_DOMAIN_SOCKET (host) ? "/.s.PGSQL." : ":", \
77         port
78
79 typedef enum {
80         C_PSQL_PARAM_HOST = 1,
81         C_PSQL_PARAM_DB,
82         C_PSQL_PARAM_USER,
83 } c_psql_param_t;
84
85 typedef struct {
86         char  *type;
87         char  *instance_prefix;
88         char **instances_str;
89         int   *instances;
90         int    instances_num;
91         char **values_str; /* may be NULL, even if values_num != 0 in
92                               case the "Column" option has been used */
93         int   *values;
94         int   *ds_types;
95         int    values_num;
96 } c_psql_result_t;
97
98 typedef struct {
99         char *name;
100         char *stmt;
101
102         c_psql_param_t *params;
103         int             params_num;
104
105         c_psql_result_t *results;
106         int              results_num;
107
108         int min_pg_version;
109         int max_pg_version;
110 } c_psql_query_t;
111
112 typedef struct {
113         PGconn      *conn;
114         c_complain_t conn_complaint;
115
116         int proto_version;
117
118         int max_params_num;
119
120         /* user configuration */
121         c_psql_query_t **queries;
122         int             *hidden_queries;
123         int              queries_num;
124
125         char *host;
126         char *port;
127         char *database;
128         char *user;
129         char *password;
130
131         char *sslmode;
132
133         char *krbsrvname;
134
135         char *service;
136 } c_psql_database_t;
137
138 static char *def_queries[] = {
139         "backends",
140         "transactions",
141         "queries",
142         "query_plans",
143         "table_states",
144         "disk_io",
145         "disk_usage"
146 };
147 static int def_queries_num = STATIC_ARRAY_SIZE (def_queries);
148
149 static c_psql_query_t *queries          = NULL;
150 static int             queries_num      = 0;
151
152 static c_psql_database_t *databases     = NULL;
153 static int                databases_num = 0;
154
155 static c_psql_result_t *c_psql_result_new (c_psql_query_t *query)
156 {
157         c_psql_result_t *res;
158
159         ++query->results_num;
160         if (NULL == (query->results = (c_psql_result_t *)realloc (query->results,
161                                         query->results_num * sizeof (*query->results)))) {
162                 log_err ("Out of memory.");
163                 exit (5);
164         }
165         res = query->results + query->results_num - 1;
166
167         res->type    = NULL;
168
169         res->instance_prefix = NULL;
170         res->instances_str   = NULL;
171         res->instances       = NULL;
172         res->instances_num   = 0;
173
174         res->values_str = NULL;
175         res->values     = NULL;
176         res->ds_types   = NULL;
177         res->values_num = 0;
178         return res;
179 } /* c_psql_result_new */
180
181 static void c_psql_result_delete (c_psql_result_t *res)
182 {
183         int i;
184
185         sfree (res->type);
186
187         sfree (res->instance_prefix);
188
189         for (i = 0; i < res->instances_num; ++i)
190                 sfree (res->instances_str[i]);
191         sfree (res->instances_str);
192         sfree (res->instances);
193         res->instances_num = 0;
194
195         for (i = 0; (NULL != res->values_str) && (i < res->values_num); ++i)
196                 sfree (res->values_str[i]);
197         sfree (res->values_str);
198         sfree (res->values);
199         sfree (res->ds_types);
200         res->values_num = 0;
201 } /* c_psql_result_delete */
202
203 static c_psql_query_t *c_psql_query_new (const char *name)
204 {
205         c_psql_query_t *query;
206
207         ++queries_num;
208         if (NULL == (queries = (c_psql_query_t *)realloc (queries,
209                                 queries_num * sizeof (*queries)))) {
210                 log_err ("Out of memory.");
211                 exit (5);
212         }
213         query = queries + queries_num - 1;
214
215         query->name = sstrdup (name);
216         query->stmt = NULL;
217
218         query->params     = NULL;
219         query->params_num = 0;
220
221         query->results     = NULL;
222         query->results_num = 0;
223
224         query->min_pg_version = 0;
225         query->max_pg_version = INT_MAX;
226         return query;
227 } /* c_psql_query_new */
228
229 static int c_psql_query_init (c_psql_query_t *query)
230 {
231         int i;
232
233         /* Get the data set definitions for each query definition. */
234         for (i = 0; i < query->results_num; ++i) {
235                 c_psql_result_t  *res = query->results + i;
236                 const data_set_t *ds;
237
238                 int j;
239
240                 ds = plugin_get_ds (res->type);
241                 if (NULL == ds) {
242                         log_err ("Result: Unknown type \"%s\".", res->type);
243                         return -1;
244                 }
245
246                 if (res->values_num != ds->ds_num) {
247                         log_err ("Result: Invalid type \"%s\" - "
248                                         "expected %i data source%s, got %i.",
249                                         res->type, res->values_num,
250                                         (1 == res->values_num) ? "" : "s",
251                                         ds->ds_num);
252                         return -1;
253                 }
254
255                 for (j = 0; j < res->values_num; ++j)
256                         res->ds_types[j] = ds->ds[j].type;
257         }
258         return 0;
259 } /* c_psql_query_init */
260
261 static void c_psql_query_delete (c_psql_query_t *query)
262 {
263         int i;
264
265         sfree (query->name);
266         sfree (query->stmt);
267
268         sfree (query->params);
269         query->params_num = 0;
270
271         for (i = 0; i < query->results_num; ++i)
272                 c_psql_result_delete (query->results + i);
273         sfree (query->results);
274         query->results_num = 0;
275         return;
276 } /* c_psql_query_delete */
277
278 static c_psql_query_t *c_psql_query_get (const char *name, int server_version)
279 {
280         int i;
281
282         for (i = 0; i < queries_num; ++i)
283                 if (0 == strcasecmp (name, queries[i].name)
284                                 && ((-1 == server_version)
285                                         || ((queries[i].min_pg_version <= server_version)
286                                                 && (server_version <= queries[i].max_pg_version))))
287                         return queries + i;
288         return NULL;
289 } /* c_psql_query_get */
290
291 static c_psql_database_t *c_psql_database_new (const char *name)
292 {
293         c_psql_database_t *db;
294
295         ++databases_num;
296         if (NULL == (databases = (c_psql_database_t *)realloc (databases,
297                                 databases_num * sizeof (*databases)))) {
298                 log_err ("Out of memory.");
299                 exit (5);
300         }
301
302         db = databases + (databases_num - 1);
303
304         db->conn = NULL;
305
306         C_COMPLAIN_INIT (&db->conn_complaint);
307
308         db->proto_version = 0;
309
310         db->max_params_num = 0;
311
312         db->queries        = NULL;
313         db->hidden_queries = NULL;
314         db->queries_num    = 0;
315
316         db->database   = sstrdup (name);
317         db->host       = NULL;
318         db->port       = NULL;
319         db->user       = NULL;
320         db->password   = NULL;
321
322         db->sslmode    = NULL;
323
324         db->krbsrvname = NULL;
325
326         db->service    = NULL;
327         return db;
328 } /* c_psql_database_new */
329
330 static void c_psql_database_init (c_psql_database_t *db, int server_version)
331 {
332         int i;
333
334         /* Get the right version of each query definition. */
335         for (i = 0; i < db->queries_num; ++i) {
336                 c_psql_query_t *tmp;
337
338                 tmp = c_psql_query_get (db->queries[i]->name, server_version);
339
340                 if (tmp == db->queries[i])
341                         continue;
342
343                 if (NULL == tmp) {
344                         log_err ("Query \"%s\" not found for server version %i - "
345                                         "please check your configuration.",
346                                         db->queries[i]->name, server_version);
347                         /* By hiding the query (rather than removing it from the list) we
348                          * don't lose it in case a reconnect to an available version
349                          * happens at a later time. */
350                         db->hidden_queries[i] = 1;
351                         continue;
352                 }
353
354                 db->hidden_queries[i] = 0;
355                 db->queries[i] = tmp;
356         }
357 } /* c_psql_database_init */
358
359 static void c_psql_database_delete (c_psql_database_t *db)
360 {
361         PQfinish (db->conn);
362         db->conn = NULL;
363
364         sfree (db->queries);
365         sfree (db->hidden_queries);
366         db->queries_num = 0;
367
368         sfree (db->database);
369         sfree (db->host);
370         sfree (db->port);
371         sfree (db->user);
372         sfree (db->password);
373
374         sfree (db->sslmode);
375
376         sfree (db->krbsrvname);
377
378         sfree (db->service);
379         return;
380 } /* c_psql_database_delete */
381
382 static void submit (const c_psql_database_t *db, const c_psql_result_t *res,
383                 char **instances, value_t *values)
384 {
385         value_list_t vl = VALUE_LIST_INIT;
386
387         int instances_num = res->instances_num;
388
389         if (NULL != res->instance_prefix)
390                 ++instances_num;
391
392         vl.values     = values;
393         vl.values_len = res->values_num;
394         vl.time       = time (NULL);
395
396         if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host)
397                         || (0 == strcmp (db->host, "localhost")))
398                 sstrncpy (vl.host, hostname_g, sizeof (vl.host));
399         else
400                 sstrncpy (vl.host, db->host, sizeof (vl.host));
401
402         sstrncpy (vl.plugin, "postgresql", sizeof (vl.plugin));
403         sstrncpy (vl.plugin_instance, db->database, sizeof (vl.plugin_instance));
404
405         sstrncpy (vl.type, res->type, sizeof (vl.type));
406
407         if (0 < instances_num) {
408                 vl.type_instance[sizeof (vl.type_instance) - 1] = '\0';
409                 strjoin (vl.type_instance, sizeof (vl.type_instance),
410                                 instances, instances_num, "-");
411
412                 if ('\0' != vl.type_instance[sizeof (vl.type_instance) - 1]) {
413                         vl.type_instance[sizeof (vl.type_instance) - 1] = '\0';
414                         log_warn ("Truncated type instance: %s.", vl.type_instance);
415                 }
416         }
417
418         plugin_dispatch_values (&vl);
419         return;
420 } /* submit */
421
422 static int c_psql_get_colnum (PGresult *pgres,
423                 char **strings, int *numbers, int idx)
424 {
425         int colnum;
426
427         if (0 <= numbers[idx])
428                 return numbers[idx];
429
430         colnum = PQfnumber (pgres, strings[idx]);
431         if (0 > colnum)
432                 log_err ("No such column: %s.", strings[idx]);
433
434         numbers[idx] = colnum;
435         return colnum;
436 } /* c_psql_get_colnum */
437
438 static void c_psql_dispatch_row (c_psql_database_t *db, c_psql_query_t *query,
439                 PGresult *pgres, int row)
440 {
441         int i;
442
443         for (i = 0; i < query->results_num; ++i) {
444                 c_psql_result_t *res = query->results + i;
445
446                 char   *instances[res->instances_num + 1];
447                 value_t values[res->values_num];
448
449                 int offset = 0, status = 0, j;
450
451                 /* get the instance name */
452                 if (NULL != res->instance_prefix) {
453                         instances[0] = res->instance_prefix;
454                         offset = 1;
455                 }
456
457                 for (j = 0; (0 == status) && (j < res->instances_num); ++j) {
458                         int col = c_psql_get_colnum (pgres,
459                                         res->instances_str, res->instances, j);
460
461                         if (0 > col) {
462                                 status = -1;
463                                 break;
464                         }
465
466                         instances[j + offset] = PQgetvalue (pgres, row, col);
467                         if (NULL == instances[j + offset])
468                                 instances[j + offset] = "";
469                 }
470
471                 /* get the values */
472                 for (j = 0; (0 == status) && (j < res->values_num); ++j) {
473                         int col = c_psql_get_colnum (pgres,
474                                         res->values_str, res->values, j);
475
476                         char *value_str;
477                         char *endptr = NULL;
478
479                         if (0 > col) {
480                                 status = -1;
481                                 break;
482                         }
483
484                         value_str = PQgetvalue (pgres, row, col);
485                         if ((NULL == value_str) || ('\0' == *value_str))
486                                 value_str = "0";
487
488                         if (res->ds_types[j] == DS_TYPE_COUNTER)
489                                 values[j].counter = (counter_t)strtoll (value_str, &endptr, 0);
490                         else if (res->ds_types[j] == DS_TYPE_GAUGE)
491                                 values[j].gauge = (gauge_t)strtod (value_str, &endptr);
492                         else {
493                                 log_err ("Invalid type \"%s\" (%i).",
494                                                 res->type, res->ds_types[j]);
495                         }
496
497                         if (value_str == endptr) {
498                                 log_err ("Failed to parse string as number: %s.", value_str);
499                                 status = -1;
500                                 break;
501                         }
502                         else if ((NULL != endptr) && ('\0' != *endptr))
503                                 log_warn ("Ignoring trailing garbage after number: %s.",
504                                                 endptr);
505                 }
506
507                 if (0 != status)
508                         continue;
509
510                 submit (db, res, instances, values);
511         }
512 } /* c_psql_dispatch_row */
513
514 static int c_psql_check_connection (c_psql_database_t *db)
515 {
516         /* "ping" */
517         PQclear (PQexec (db->conn, "SELECT 42;"));
518
519         if (CONNECTION_OK != PQstatus (db->conn)) {
520                 PQreset (db->conn);
521
522                 /* trigger c_release() */
523                 if (0 == db->conn_complaint.interval)
524                         db->conn_complaint.interval = 1;
525
526                 if (CONNECTION_OK != PQstatus (db->conn)) {
527                         c_complain (LOG_ERR, &db->conn_complaint,
528                                         "Failed to connect to database %s: %s",
529                                         db->database, PQerrorMessage (db->conn));
530                         return -1;
531                 }
532
533                 db->proto_version = PQprotocolVersion (db->conn);
534                 if (3 > db->proto_version)
535                         log_warn ("Protocol version %d does not support parameters.",
536                                         db->proto_version);
537         }
538
539         /* We might have connected to a different PostgreSQL version, so we
540          * need to reinitialize stuff. */
541         if (c_would_release (&db->conn_complaint))
542                 c_psql_database_init (db, PQserverVersion (db->conn));
543
544         c_release (LOG_INFO, &db->conn_complaint,
545                         "Successfully reconnected to database %s", PQdb (db->conn));
546         return 0;
547 } /* c_psql_check_connection */
548
549 static PGresult *c_psql_exec_query_params (c_psql_database_t *db,
550                 c_psql_query_t *query)
551 {
552         char *params[db->max_params_num];
553         int   i;
554
555         assert (db->max_params_num >= query->params_num);
556
557         for (i = 0; i < query->params_num; ++i) {
558                 switch (query->params[i]) {
559                         case C_PSQL_PARAM_HOST:
560                                 params[i] = C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host)
561                                         ? "localhost" : db->host;
562                                 break;
563                         case C_PSQL_PARAM_DB:
564                                 params[i] = db->database;
565                                 break;
566                         case C_PSQL_PARAM_USER:
567                                 params[i] = db->user;
568                                 break;
569                         default:
570                                 assert (0);
571                 }
572         }
573
574         return PQexecParams (db->conn, query->stmt, query->params_num, NULL,
575                         (const char *const *)((0 == query->params_num) ? NULL : params),
576                         NULL, NULL, /* return text data */ 0);
577 } /* c_psql_exec_query_params */
578
579 static PGresult *c_psql_exec_query_noparams (c_psql_database_t *db,
580                 c_psql_query_t *query)
581 {
582         return PQexec (db->conn, query->stmt);
583 } /* c_psql_exec_query_noparams */
584
585 static int c_psql_exec_query (c_psql_database_t *db, int idx)
586 {
587         c_psql_query_t *query;
588         PGresult       *res;
589
590         int rows, cols;
591         int i;
592
593         if (idx >= db->queries_num)
594                 return -1;
595
596         if (0 != db->hidden_queries[idx])
597                 return 0;
598
599         query = db->queries[idx];
600
601         if (3 <= db->proto_version)
602                 res = c_psql_exec_query_params (db, query);
603         else if (0 == query->params_num)
604                 res = c_psql_exec_query_noparams (db, query);
605         else {
606                 log_err ("Connection to database \"%s\" does not support parameters "
607                                 "(protocol version %d) - cannot execute query \"%s\".",
608                                 db->database, db->proto_version, query->name);
609                 return -1;
610         }
611
612         if (PGRES_TUPLES_OK != PQresultStatus (res)) {
613                 log_err ("Failed to execute SQL query: %s",
614                                 PQerrorMessage (db->conn));
615                 log_info ("SQL query was: %s", query->stmt);
616                 PQclear (res);
617                 return -1;
618         }
619
620         rows = PQntuples (res);
621         if (1 > rows) {
622                 PQclear (res);
623                 return 0;
624         }
625
626         cols = PQnfields (res);
627
628         for (i = 0; i < rows; ++i)
629                 c_psql_dispatch_row (db, query, res, i);
630         PQclear (res);
631         return 0;
632 } /* c_psql_exec_query */
633
634 static int c_psql_read (void)
635 {
636         int success = 0;
637         int i;
638
639         for (i = 0; i < databases_num; ++i) {
640                 c_psql_database_t *db = databases + i;
641
642                 int j;
643
644                 assert (NULL != db->database);
645
646                 if (0 != c_psql_check_connection (db))
647                         continue;
648
649                 for (j = 0; j < db->queries_num; ++j)
650                         c_psql_exec_query (db, j);
651
652                 ++success;
653         }
654
655         if (! success)
656                 return -1;
657         return 0;
658 } /* c_psql_read */
659
660 static int c_psql_shutdown (void)
661 {
662         int i;
663
664         if ((NULL == databases) || (0 == databases_num))
665                 return 0;
666
667         plugin_unregister_read ("postgresql");
668         plugin_unregister_shutdown ("postgresql");
669
670         for (i = 0; i < databases_num; ++i) {
671                 c_psql_database_t *db = databases + i;
672                 c_psql_database_delete (db);
673         }
674
675         sfree (databases);
676         databases_num = 0;
677
678         for (i = 0; i < queries_num; ++i) {
679                 c_psql_query_t *query = queries + i;
680                 c_psql_query_delete (query);
681         }
682
683         sfree (queries);
684         queries_num = 0;
685         return 0;
686 } /* c_psql_shutdown */
687
688 static int c_psql_init (void)
689 {
690         int i;
691
692         if ((NULL == databases) || (0 == databases_num))
693                 return 0;
694
695         for (i = 0; i < queries_num; ++i)
696                 if (0 != c_psql_query_init (queries + i)) {
697                         c_psql_shutdown ();
698                         return -1;
699                 }
700
701         for (i = 0; i < databases_num; ++i) {
702                 c_psql_database_t *db = databases + i;
703
704                 char  conninfo[4096];
705                 char *buf     = conninfo;
706                 int   buf_len = sizeof (conninfo);
707                 int   status;
708
709                 char *server_host;
710                 int   server_version;
711
712                 /* this will happen during reinitialization */
713                 if (NULL != db->conn) {
714                         c_psql_check_connection (db);
715                         continue;
716                 }
717
718                 status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database);
719                 if (0 < status) {
720                         buf     += status;
721                         buf_len -= status;
722                 }
723
724                 C_PSQL_PAR_APPEND (buf, buf_len, "host",       db->host);
725                 C_PSQL_PAR_APPEND (buf, buf_len, "port",       db->port);
726                 C_PSQL_PAR_APPEND (buf, buf_len, "user",       db->user);
727                 C_PSQL_PAR_APPEND (buf, buf_len, "password",   db->password);
728                 C_PSQL_PAR_APPEND (buf, buf_len, "sslmode",    db->sslmode);
729                 C_PSQL_PAR_APPEND (buf, buf_len, "krbsrvname", db->krbsrvname);
730                 C_PSQL_PAR_APPEND (buf, buf_len, "service",    db->service);
731
732                 db->conn = PQconnectdb (conninfo);
733                 if (0 != c_psql_check_connection (db))
734                         continue;
735
736                 db->proto_version = PQprotocolVersion (db->conn);
737
738                 server_host    = PQhost (db->conn);
739                 server_version = PQserverVersion (db->conn);
740                 log_info ("Sucessfully connected to database %s (user %s) "
741                                 "at server %s%s%s (server version: %d.%d.%d, "
742                                 "protocol version: %d, pid: %d)",
743                                 PQdb (db->conn), PQuser (db->conn),
744                                 C_PSQL_SOCKET3 (server_host, PQport (db->conn)),
745                                 C_PSQL_SERVER_VERSION3 (server_version),
746                                 db->proto_version, PQbackendPID (db->conn));
747
748                 if (3 > db->proto_version)
749                         log_warn ("Protocol version %d does not support parameters.",
750                                         db->proto_version);
751
752                 c_psql_database_init (db, server_version);
753         }
754
755         plugin_register_read ("postgresql", c_psql_read);
756         plugin_register_shutdown ("postgresql", c_psql_shutdown);
757         return 0;
758 } /* c_psql_init */
759
760 static int config_set_s (char *name, char **var, const oconfig_item_t *ci)
761 {
762         if ((0 != ci->children_num) || (1 != ci->values_num)
763                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
764                 log_err ("%s expects a single string argument.", name);
765                 return 1;
766         }
767
768         sfree (*var);
769         *var = sstrdup (ci->values[0].value.string);
770         return 0;
771 } /* config_set_s */
772
773 static int config_set_i (char *name, int *var, const oconfig_item_t *ci)
774 {
775         if ((0 != ci->children_num) || (1 != ci->values_num)
776                         || (OCONFIG_TYPE_NUMBER != ci->values[0].type)) {
777                 log_err ("%s expects a single number argument.", name);
778                 return 1;
779         }
780
781         *var = (int)ci->values[0].value.number;
782         return 0;
783 } /* config_set_i */
784
785 static int config_append_array_s (char *name, char ***var, int *len,
786                 const oconfig_item_t *ci)
787 {
788         int i;
789
790         if ((0 != ci->children_num) || (1 > ci->values_num)) {
791                 log_err ("%s expects at least one argument.", name);
792                 return 1;
793         }
794
795         for (i = 0; i < ci->values_num; ++i) {
796                 if (OCONFIG_TYPE_STRING != ci->values[i].type) {
797                         log_err ("%s expects string arguments.", name);
798                         return 1;
799                 }
800         }
801
802         *len += ci->values_num;
803         if (NULL == (*var = (char **)realloc (*var, *len * sizeof (**var)))) {
804                 log_err ("Out of memory.");
805                 exit (5);
806         }
807
808         for (i = *len - ci->values_num; i < *len; ++i)
809                 (*var)[i] = sstrdup (ci->values[i].value.string);
810         return 0;
811 } /* config_append_array_s */
812
813 static int config_set_param (c_psql_query_t *query, const oconfig_item_t *ci)
814 {
815         c_psql_param_t param;
816         char          *param_str;
817
818         if ((0 != ci->children_num) || (1 != ci->values_num)
819                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
820                 log_err ("Param expects a single string argument.");
821                 return 1;
822         }
823
824         param_str = ci->values[0].value.string;
825         if (0 == strcasecmp (param_str, "hostname"))
826                 param = C_PSQL_PARAM_HOST;
827         else if (0 == strcasecmp (param_str, "database"))
828                 param = C_PSQL_PARAM_DB;
829         else if (0 == strcasecmp (param_str, "username"))
830                 param = C_PSQL_PARAM_USER;
831         else {
832                 log_err ("Invalid parameter \"%s\".", param_str);
833                 return 1;
834         }
835
836         ++query->params_num;
837         if (NULL == (query->params = (c_psql_param_t *)realloc (query->params,
838                                 query->params_num * sizeof (*query->params)))) {
839                 log_err ("Out of memory.");
840                 exit (5);
841         }
842
843         query->params[query->params_num - 1] = param;
844         return 0;
845 } /* config_set_param */
846
847 static int config_set_result (c_psql_query_t *query, const oconfig_item_t *ci)
848 {
849         c_psql_result_t *res;
850
851         int status = 0, i;
852
853         if (0 != ci->values_num) {
854                 log_err ("<Result> does not expect any arguments.");
855                 return 1;
856         }
857
858         res = c_psql_result_new (query);
859
860         for (i = 0; i < ci->children_num; ++i) {
861                 oconfig_item_t *c = ci->children + i;
862
863                 if (0 == strcasecmp (c->key, "Type"))
864                         config_set_s ("Type", &res->type, c);
865                 else if (0 == strcasecmp (c->key, "InstancePrefix"))
866                         config_set_s ("InstancePrefix", &res->instance_prefix, c);
867                 else if (0 == strcasecmp (c->key, "InstancesFrom"))
868                         config_append_array_s ("InstancesFrom",
869                                         &res->instances_str, &res->instances_num, c);
870                 else if (0 == strcasecmp (c->key, "ValuesFrom"))
871                         config_append_array_s ("ValuesFrom",
872                                         &res->values_str, &res->values_num, c);
873                 else
874                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
875         }
876
877         if (NULL == res->type) {
878                 log_warn ("Query \"%s\": Missing Type option in <Result> block.",
879                                 query->name);
880                 status = 1;
881         }
882
883         if (NULL == res->values_str) {
884                 log_warn ("Query \"%s\": Missing ValuesFrom option in <Result> block.",
885                                 query->name);
886                 status = 1;
887         }
888
889         if (0 != status) {
890                 c_psql_result_delete (res);
891                 --query->results_num;
892                 return status;
893         }
894
895         /* preallocate memory to cache the column numbers and data types */
896         res->values = (int *)smalloc (res->values_num * sizeof (*res->values));
897         for (i = 0; i < res->values_num; ++i)
898                 res->values[i] = -1;
899
900         res->instances = (int *)smalloc (res->instances_num
901                         * sizeof (*res->instances));
902         for (i = 0; i < res->instances_num; ++i)
903                 res->instances[i] = -1;
904
905         res->ds_types = (int *)smalloc (res->values_num
906                         * sizeof (*res->ds_types));
907         for (i = 0; i < res->values_num; ++i)
908                 res->ds_types[i] = -1;
909         return 0;
910 } /* config_set_result */
911
912 static int config_set_column (c_psql_query_t *query, int col_num,
913                 const oconfig_item_t *ci)
914 {
915         c_psql_result_t *res;
916
917         int i;
918
919         if ((0 != ci->children_num)
920                         || (1 > ci->values_num) || (2 < ci->values_num)) {
921                 log_err ("Column expects either one or two arguments.");
922                 return 1;
923         }
924
925         for (i = 0; i < ci->values_num; ++i) {
926                 if (OCONFIG_TYPE_STRING != ci->values[i].type) {
927                         log_err ("Column expects either one or two string arguments.");
928                         return 1;
929                 }
930         }
931
932         res = c_psql_result_new (query);
933
934         res->type = sstrdup (ci->values[0].value.string);
935
936         if (2 == ci->values_num)
937                 res->instance_prefix = sstrdup (ci->values[1].value.string);
938
939         res->values     = (int *)smalloc (sizeof (*res->values));
940         res->values[0]  = col_num;
941         res->ds_types   = (int *)smalloc (sizeof (*res->ds_types));
942         res->values_num = 1;
943         return 0;
944 } /* config_set_column */
945
946 static int set_query (c_psql_database_t *db, const char *name)
947 {
948         c_psql_query_t *query;
949
950         query = c_psql_query_get (name, -1);
951         if (NULL == query) {
952                 log_err ("Query \"%s\" not found - please check your configuration.",
953                                 name);
954                 return 1;
955         }
956
957         ++db->queries_num;
958         if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries,
959                                 db->queries_num * sizeof (*db->queries)))) {
960                 log_err ("Out of memory.");
961                 exit (5);
962         }
963
964         if (query->params_num > db->max_params_num)
965                 db->max_params_num = query->params_num;
966
967         db->queries[db->queries_num - 1] = query;
968         return 0;
969 } /* set_query */
970
971 static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci)
972 {
973         if ((0 != ci->children_num) || (1 != ci->values_num)
974                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
975                 log_err ("Query expects a single string argument.");
976                 return 1;
977         }
978         return set_query (db, ci->values[0].value.string);
979 } /* config_set_query */
980
981 static int c_psql_config_query (oconfig_item_t *ci)
982 {
983         c_psql_query_t *query;
984
985         int status = 0, col_num = 0, i;
986
987         if ((1 != ci->values_num)
988                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
989                 log_err ("<Query> expects a single string argument.");
990                 return 1;
991         }
992
993         query = c_psql_query_new (ci->values[0].value.string);
994
995         for (i = 0; i < ci->children_num; ++i) {
996                 oconfig_item_t *c = ci->children + i;
997
998                 if (0 == strcasecmp (c->key, "Statement"))
999                         config_set_s ("Statement", &query->stmt, c);
1000                 /* backwards compat for versions < 4.6 */
1001                 else if (0 == strcasecmp (c->key, "Query")) {
1002                         log_warn ("<Query>: 'Query' is deprecated - use 'Statement' instead.");
1003                         config_set_s ("Query", &query->stmt, c);
1004                 }
1005                 else if (0 == strcasecmp (c->key, "Param"))
1006                         config_set_param (query, c);
1007                 else if (0 == strcasecmp (c->key, "Result"))
1008                         config_set_result (query, c);
1009                 /* backwards compat for versions < 4.6 */
1010                 else if (0 == strcasecmp (c->key, "Column")) {
1011                         log_warn ("<Query>: 'Column' is deprecated - "
1012                                         "use a <Result> block instead.");
1013                         config_set_column (query, col_num, c);
1014                         ++col_num;
1015                 }
1016                 else if (0 == strcasecmp (c->key, "MinPGVersion"))
1017                         config_set_i ("MinPGVersion", &query->min_pg_version, c);
1018                 else if (0 == strcasecmp (c->key, "MaxPGVersion"))
1019                         config_set_i ("MaxPGVersion", &query->max_pg_version, c);
1020                 else
1021                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
1022         }
1023
1024         for (i = 0; i < queries_num - 1; ++i) {
1025                 c_psql_query_t *q = queries + i;
1026
1027                 if ((0 == strcasecmp (q->name, query->name))
1028                                 && (q->min_pg_version <= query->max_pg_version)
1029                                 && (query->min_pg_version <= q->max_pg_version)) {
1030                         log_err ("Ignoring redefinition (with overlapping version ranges) "
1031                                         "of query \"%s\".", query->name);
1032                         status = 1;
1033                         break;
1034                 }
1035         }
1036
1037         if (query->min_pg_version > query->max_pg_version) {
1038                 log_err ("Query \"%s\": MinPGVersion > MaxPGVersion.",
1039                                 query->name);
1040                 status = 1;
1041         }
1042
1043         if (NULL == query->stmt) {
1044                 log_err ("Query \"%s\" does not include an SQL query statement - "
1045                                 "please check your configuration.", query->name);
1046                 status = 1;
1047         }
1048
1049         if (0 != status) {
1050                 c_psql_query_delete (query);
1051                 --queries_num;
1052                 return status;
1053         }
1054         return 0;
1055 } /* c_psql_config_query */
1056
1057 static int c_psql_config_database (oconfig_item_t *ci)
1058 {
1059         c_psql_database_t *db;
1060
1061         int i;
1062
1063         if ((1 != ci->values_num)
1064                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
1065                 log_err ("<Database> expects a single string argument.");
1066                 return 1;
1067         }
1068
1069         db = c_psql_database_new (ci->values[0].value.string);
1070
1071         for (i = 0; i < ci->children_num; ++i) {
1072                 oconfig_item_t *c = ci->children + i;
1073
1074                 if (0 == strcasecmp (c->key, "Host"))
1075                         config_set_s ("Host", &db->host, c);
1076                 else if (0 == strcasecmp (c->key, "Port"))
1077                         config_set_s ("Port", &db->port, c);
1078                 else if (0 == strcasecmp (c->key, "User"))
1079                         config_set_s ("User", &db->user, c);
1080                 else if (0 == strcasecmp (c->key, "Password"))
1081                         config_set_s ("Password", &db->password, c);
1082                 else if (0 == strcasecmp (c->key, "SSLMode"))
1083                         config_set_s ("SSLMode", &db->sslmode, c);
1084                 else if (0 == strcasecmp (c->key, "KRBSrvName"))
1085                         config_set_s ("KRBSrvName", &db->krbsrvname, c);
1086                 else if (0 == strcasecmp (c->key, "Service"))
1087                         config_set_s ("Service", &db->service, c);
1088                 else if (0 == strcasecmp (c->key, "Query"))
1089                         config_set_query (db, c);
1090                 else
1091                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
1092         }
1093
1094         if (NULL == db->queries) {
1095                 for (i = 0; i < def_queries_num; ++i)
1096                         set_query (db, def_queries[i]);
1097         }
1098
1099         db->hidden_queries = (int *)calloc (db->queries_num,
1100                         sizeof (*db->hidden_queries));
1101         if (NULL == db->hidden_queries) {
1102                 log_err ("Out of memory.");
1103                 exit (5);
1104         }
1105         return 0;
1106 } /* c_psql_config_database */
1107
1108 static int c_psql_config (oconfig_item_t *ci)
1109 {
1110         static int have_def_config = 0;
1111
1112         int i;
1113
1114         if (0 == have_def_config) {
1115                 oconfig_item_t *c;
1116
1117                 have_def_config = 1;
1118
1119                 c = oconfig_parse_file (C_PSQL_DEFAULT_CONF);
1120                 if (NULL == c)
1121                         log_err ("Failed to read default config ("C_PSQL_DEFAULT_CONF").");
1122                 else
1123                         c_psql_config (c);
1124
1125                 if (NULL == queries)
1126                         log_err ("Default config ("C_PSQL_DEFAULT_CONF") did not define "
1127                                         "any queries - please check your installation.");
1128         }
1129
1130         for (i = 0; i < ci->children_num; ++i) {
1131                 oconfig_item_t *c = ci->children + i;
1132
1133                 if (0 == strcasecmp (c->key, "Query"))
1134                         c_psql_config_query (c);
1135                 else if (0 == strcasecmp (c->key, "Database"))
1136                         c_psql_config_database (c);
1137                 else
1138                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
1139         }
1140         return 0;
1141 } /* c_psql_config */
1142
1143 void module_register (void)
1144 {
1145         plugin_register_complex_config ("postgresql", c_psql_config);
1146         plugin_register_init ("postgresql", c_psql_init);
1147 } /* module_register */
1148
1149 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */
1150