postgresql plugin: Define default queries in a configuration file.
[collectd.git] / src / postgresql.c
1 /**
2  * collectd - src/postgresql.c
3  * Copyright (C) 2008  Sebastian Harl
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Sebastian Harl <sh at tokkee.org>
20  **/
21
22 /*
23  * This module collects PostgreSQL database statistics.
24  */
25
26 #include "collectd.h"
27 #include "common.h"
28
29 #include "configfile.h"
30 #include "plugin.h"
31
32 #include "utils_complain.h"
33
34 #include <pg_config_manual.h>
35 #include <libpq-fe.h>
36
37 #define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
38 #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
39 #define log_info(...) INFO ("postgresql: " __VA_ARGS__)
40
41 #ifndef C_PSQL_DEFAULT_CONF
42 # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
43 #endif
44
45 /* Appends the (parameter, value) pair to the string
46  * pointed to by 'buf' suitable to be used as argument
47  * for PQconnectdb(). If value equals NULL, the pair
48  * is ignored. */
49 #define C_PSQL_PAR_APPEND(buf, buf_len, parameter, value) \
50         if ((0 < (buf_len)) && (NULL != (value)) && ('\0' != *(value))) { \
51                 int s = ssnprintf (buf, buf_len, " %s = '%s'", parameter, value); \
52                 if (0 < s) { \
53                         buf     += s; \
54                         buf_len -= s; \
55                 } \
56         }
57
58 /* Returns the tuple (major, minor, patchlevel)
59  * for the given version number. */
60 #define C_PSQL_SERVER_VERSION3(server_version) \
61         (server_version) / 10000, \
62         (server_version) / 100 - (int)((server_version) / 10000) * 100, \
63         (server_version) - (int)((server_version) / 100) * 100
64
65 /* Returns true if the given host specifies a
66  * UNIX domain socket. */
67 #define C_PSQL_IS_UNIX_DOMAIN_SOCKET(host) \
68         ((NULL == (host)) || ('\0' == *(host)) || ('/' == *(host)))
69
70 /* Returns the tuple (host, delimiter, port) for a
71  * given (host, port) pair. Depending on the value of
72  * 'host' a UNIX domain socket or a TCP socket is
73  * assumed. */
74 #define C_PSQL_SOCKET3(host, port) \
75         ((NULL == (host)) || ('\0' == *(host))) ? DEFAULT_PGSOCKET_DIR : host, \
76         C_PSQL_IS_UNIX_DOMAIN_SOCKET (host) ? "/.s.PGSQL." : ":", \
77         port
78
79 typedef struct {
80         char *type;
81         char *type_instance;
82         int   ds_type;
83 } c_psql_col_t;
84
85 typedef struct {
86         char *name;
87         char *query;
88
89         c_psql_col_t *cols;
90         int           cols_num;
91 } c_psql_query_t;
92
93 typedef struct {
94         PGconn      *conn;
95         c_complain_t conn_complaint;
96
97         /* user configuration */
98         c_psql_query_t **queries;
99         int              queries_num;
100
101         char *host;
102         char *port;
103         char *database;
104         char *user;
105         char *password;
106
107         char *sslmode;
108
109         char *krbsrvname;
110
111         char *service;
112 } c_psql_database_t;
113
114 static char *def_queries[] = {
115         "user_tables",
116         "io_user_tables"
117 };
118 static int def_queries_num = STATIC_ARRAY_SIZE (def_queries);
119
120 static c_psql_query_t *queries          = NULL;
121 static int             queries_num      = 0;
122
123 static c_psql_database_t *databases     = NULL;
124 static int                databases_num = 0;
125
126 static c_psql_query_t *c_psql_query_new (const char *name)
127 {
128         c_psql_query_t *query;
129
130         ++queries_num;
131         if (NULL == (queries = (c_psql_query_t *)realloc (queries,
132                                 queries_num * sizeof (*queries)))) {
133                 log_err ("Out of memory.");
134                 exit (5);
135         }
136         query = queries + queries_num - 1;
137
138         query->name  = sstrdup (name);
139         query->query = NULL;
140
141         query->cols     = NULL;
142         query->cols_num = 0;
143         return query;
144 } /* c_psql_query_new */
145
146 static void c_psql_query_delete (c_psql_query_t *query)
147 {
148         int i;
149
150         sfree (query->name);
151         sfree (query->query);
152
153         for (i = 0; i < query->cols_num; ++i) {
154                 sfree (query->cols[i].type);
155                 sfree (query->cols[i].type_instance);
156         }
157         sfree (query->cols);
158         query->cols_num = 0;
159         return;
160 } /* c_psql_query_delete */
161
162 static c_psql_query_t *c_psql_query_get (const char *name)
163 {
164         int i;
165
166         for (i = 0; i < queries_num; ++i)
167                 if (0 == strcasecmp (name, queries[i].name))
168                         return queries + i;
169         return NULL;
170 } /* c_psql_query_get */
171
172 static c_psql_database_t *c_psql_database_new (const char *name)
173 {
174         c_psql_database_t *db;
175
176         ++databases_num;
177         if (NULL == (databases = (c_psql_database_t *)realloc (databases,
178                                 databases_num * sizeof (*databases)))) {
179                 log_err ("Out of memory.");
180                 exit (5);
181         }
182
183         db = databases + (databases_num - 1);
184
185         db->conn = NULL;
186
187         db->conn_complaint.last     = 0;
188         db->conn_complaint.interval = 0;
189
190         db->queries     = NULL;
191         db->queries_num = 0;
192
193         db->database   = sstrdup (name);
194         db->host       = NULL;
195         db->port       = NULL;
196         db->user       = NULL;
197         db->password   = NULL;
198
199         db->sslmode    = NULL;
200
201         db->krbsrvname = NULL;
202
203         db->service    = NULL;
204         return db;
205 } /* c_psql_database_new */
206
207 static void c_psql_database_delete (c_psql_database_t *db)
208 {
209         PQfinish (db->conn);
210
211         sfree (db->queries);
212         db->queries_num = 0;
213
214         sfree (db->database);
215         sfree (db->host);
216         sfree (db->port);
217         sfree (db->user);
218         sfree (db->password);
219
220         sfree (db->sslmode);
221
222         sfree (db->krbsrvname);
223
224         sfree (db->service);
225         return;
226 } /* c_psql_database_delete */
227
228 static void submit (const c_psql_database_t *db,
229                 const char *type, const char *type_instance,
230                 value_t *values, size_t values_len)
231 {
232         value_list_t vl = VALUE_LIST_INIT;
233
234         vl.values     = values;
235         vl.values_len = values_len;
236         vl.time       = time (NULL);
237
238         if (C_PSQL_IS_UNIX_DOMAIN_SOCKET (db->host)
239                         || (0 == strcmp (db->host, "localhost")))
240                 sstrncpy (vl.host, hostname_g, sizeof (vl.host));
241         else
242                 sstrncpy (vl.host, db->host, sizeof (vl.host));
243
244         sstrncpy (vl.plugin, "postgresql", sizeof (vl.plugin));
245         sstrncpy (vl.plugin_instance, db->database, sizeof (vl.plugin_instance));
246
247         sstrncpy (vl.type, type, sizeof (vl.type));
248
249         if (NULL != type_instance)
250                 sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
251
252         plugin_dispatch_values (&vl);
253         return;
254 } /* submit */
255
256 static void submit_counter (const c_psql_database_t *db,
257                 const char *type, const char *type_instance,
258                 const char *value)
259 {
260         value_t values[1];
261
262         if ((NULL == value) || ('\0' == *value))
263                 return;
264
265         values[0].counter = atoll (value);
266         submit (db, type, type_instance, values, 1);
267         return;
268 } /* submit_counter */
269
270 static void submit_gauge (const c_psql_database_t *db,
271                 const char *type, const char *type_instance,
272                 const char *value)
273 {
274         value_t values[1];
275
276         if ((NULL == value) || ('\0' == *value))
277                 return;
278
279         values[0].gauge = atof (value);
280         submit (db, type, type_instance, values, 1);
281         return;
282 } /* submit_gauge */
283
284 static int c_psql_check_connection (c_psql_database_t *db)
285 {
286         /* "ping" */
287         PQclear (PQexec (db->conn, "SELECT 42;"));
288
289         if (CONNECTION_OK != PQstatus (db->conn)) {
290                 PQreset (db->conn);
291
292                 /* trigger c_release() */
293                 if (0 == db->conn_complaint.interval)
294                         db->conn_complaint.interval = 1;
295
296                 if (CONNECTION_OK != PQstatus (db->conn)) {
297                         c_complain (LOG_ERR, &db->conn_complaint,
298                                         "Failed to connect to database %s: %s",
299                                         db->database, PQerrorMessage (db->conn));
300                         return -1;
301                 }
302         }
303
304         c_release (LOG_INFO, &db->conn_complaint,
305                         "Successfully reconnected to database %s", PQdb (db->conn));
306         return 0;
307 } /* c_psql_check_connection */
308
309 static int c_psql_exec_query (c_psql_database_t *db, int idx)
310 {
311         c_psql_query_t *query;
312         PGresult       *res;
313
314         int rows, cols;
315         int i;
316
317         if (idx >= db->queries_num)
318                 return -1;
319
320         query = db->queries[idx];
321
322         res = PQexec (db->conn, query->query);
323
324         if (PGRES_TUPLES_OK != PQresultStatus (res)) {
325                 log_err ("Failed to execute SQL query: %s",
326                                 PQerrorMessage (db->conn));
327                 log_info ("SQL query was: %s", query->query);
328                 PQclear (res);
329                 return -1;
330         }
331
332         rows = PQntuples (res);
333         if (1 > rows)
334                 return 0;
335
336         cols = PQnfields (res);
337         if (query->cols_num != cols) {
338                 log_err ("SQL query returned wrong number of fields "
339                                 "(expected: %i, got: %i)", query->cols_num, cols);
340                 log_info ("SQL query was: %s", query->query);
341                 return -1;
342         }
343
344         for (i = 0; i < rows; ++i) {
345                 int j;
346
347                 for (j = 0; j < cols; ++j) {
348                         c_psql_col_t col = query->cols[j];
349
350                         char *value = PQgetvalue (res, i, j);
351
352                         if (col.ds_type == DS_TYPE_COUNTER)
353                                 submit_counter (db, col.type, col.type_instance, value);
354                         else if (col.ds_type == DS_TYPE_GAUGE)
355                                 submit_gauge (db, col.type, col.type_instance, value);
356                 }
357         }
358         return 0;
359 } /* c_psql_exec_query */
360
361 static int c_psql_stat_database (c_psql_database_t *db)
362 {
363         const char *const query =
364                 "SELECT numbackends, xact_commit, xact_rollback "
365                         "FROM pg_stat_database "
366                         "WHERE datname = $1;";
367
368         PGresult *res;
369
370         int n;
371
372         res = PQexecParams (db->conn, query, /* number of parameters */ 1,
373                         NULL, (const char *const *)&db->database, NULL, NULL,
374                         /* return text data */ 0);
375
376         if (PGRES_TUPLES_OK != PQresultStatus (res)) {
377                 log_err ("Failed to execute SQL query: %s",
378                                 PQerrorMessage (db->conn));
379                 log_info ("SQL query was: %s", query);
380                 PQclear (res);
381                 return -1;
382         }
383
384         n = PQntuples (res);
385         if (1 < n) {
386                 log_warn ("pg_stat_database has more than one entry "
387                                 "for database %s - ignoring additional results.",
388                                 db->database);
389         }
390         else if (1 > n) {
391                 log_err ("pg_stat_database has no entry for database %s",
392                                 db->database);
393                 PQclear (res);
394                 return -1;
395         }
396
397         submit_gauge (db, "pg_numbackends", NULL,  PQgetvalue (res, 0, 0));
398
399         submit_counter (db, "pg_xact", "commit",   PQgetvalue (res, 0, 1));
400         submit_counter (db, "pg_xact", "rollback", PQgetvalue (res, 0, 2));
401
402         PQclear (res);
403         return 0;
404 } /* c_psql_stat_database */
405
406 static int c_psql_read (void)
407 {
408         int success = 0;
409         int i;
410
411         for (i = 0; i < databases_num; ++i) {
412                 c_psql_database_t *db = databases + i;
413
414                 int j;
415
416                 assert (NULL != db->database);
417
418                 if (0 != c_psql_check_connection (db))
419                         continue;
420
421                 c_psql_stat_database (db);
422
423                 for (j = 0; j < db->queries_num; ++j)
424                         c_psql_exec_query (db, j);
425
426                 ++success;
427         }
428
429         if (! success)
430                 return -1;
431         return 0;
432 } /* c_psql_read */
433
434 static int c_psql_shutdown (void)
435 {
436         int i;
437
438         if ((NULL == databases) || (0 == databases_num))
439                 return 0;
440
441         plugin_unregister_read ("postgresql");
442         plugin_unregister_shutdown ("postgresql");
443
444         for (i = 0; i < databases_num; ++i) {
445                 c_psql_database_t *db = databases + i;
446                 c_psql_database_delete (db);
447         }
448
449         sfree (databases);
450         databases_num = 0;
451
452         for (i = 0; i < queries_num; ++i) {
453                 c_psql_query_t *query = queries + i;
454                 c_psql_query_delete (query);
455         }
456
457         sfree (queries);
458         queries_num = 0;
459         return 0;
460 } /* c_psql_shutdown */
461
462 static int c_psql_init (void)
463 {
464         int i;
465
466         if ((NULL == databases) || (0 == databases_num))
467                 return 0;
468
469         for (i = 0; i < queries_num; ++i) {
470                 c_psql_query_t *query = queries + i;
471                 int j;
472
473                 for (j = 0; j < query->cols_num; ++j) {
474                         c_psql_col_t     *col = query->cols + j;
475                         const data_set_t *ds;
476
477                         ds = plugin_get_ds (col->type);
478                         if (NULL == ds) {
479                                 log_err ("Column: Unknown type \"%s\".", col->type);
480                                 c_psql_shutdown ();
481                                 return -1;
482                         }
483
484                         if (1 != ds->ds_num) {
485                                 log_err ("Column: Invalid type \"%s\" - types defining "
486                                                 "one data source are supported only (got: %i).",
487                                                 col->type, ds->ds_num);
488                                 c_psql_shutdown ();
489                                 return -1;
490                         }
491
492                         col->ds_type = ds->ds[0].type;
493                 }
494         }
495
496         for (i = 0; i < databases_num; ++i) {
497                 c_psql_database_t *db = databases + i;
498
499                 char  conninfo[4096];
500                 char *buf     = conninfo;
501                 int   buf_len = sizeof (conninfo);
502                 int   status;
503
504                 char *server_host;
505                 int   server_version;
506
507                 status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database);
508                 if (0 < status) {
509                         buf     += status;
510                         buf_len -= status;
511                 }
512
513                 C_PSQL_PAR_APPEND (buf, buf_len, "host",       db->host);
514                 C_PSQL_PAR_APPEND (buf, buf_len, "port",       db->port);
515                 C_PSQL_PAR_APPEND (buf, buf_len, "user",       db->user);
516                 C_PSQL_PAR_APPEND (buf, buf_len, "password",   db->password);
517                 C_PSQL_PAR_APPEND (buf, buf_len, "sslmode",    db->sslmode);
518                 C_PSQL_PAR_APPEND (buf, buf_len, "krbsrvname", db->krbsrvname);
519                 C_PSQL_PAR_APPEND (buf, buf_len, "service",    db->service);
520
521                 db->conn = PQconnectdb (conninfo);
522                 if (0 != c_psql_check_connection (db))
523                         continue;
524
525                 server_host    = PQhost (db->conn);
526                 server_version = PQserverVersion (db->conn);
527                 log_info ("Sucessfully connected to database %s (user %s) "
528                                 "at server %s%s%s (server version: %d.%d.%d, "
529                                 "protocol version: %d, pid: %d)",
530                                 PQdb (db->conn), PQuser (db->conn),
531                                 C_PSQL_SOCKET3 (server_host, PQport (db->conn)),
532                                 C_PSQL_SERVER_VERSION3 (server_version),
533                                 PQprotocolVersion (db->conn), PQbackendPID (db->conn));
534         }
535
536         plugin_register_read ("postgresql", c_psql_read);
537         plugin_register_shutdown ("postgresql", c_psql_shutdown);
538         return 0;
539 } /* c_psql_init */
540
541 static int config_set (char *name, char **var, const oconfig_item_t *ci)
542 {
543         if ((0 != ci->children_num) || (1 != ci->values_num)
544                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
545                 log_err ("%s expects a single string argument.", name);
546                 return 1;
547         }
548
549         sfree (*var);
550         *var = sstrdup (ci->values[0].value.string);
551         return 0;
552 } /* config_set */
553
554 static int config_set_column (c_psql_query_t *query, const oconfig_item_t *ci)
555 {
556         c_psql_col_t *col;
557
558         int i;
559
560         if ((0 != ci->children_num)
561                         || (1 > ci->values_num) || (2 < ci->values_num)) {
562                 log_err ("Column expects either one or two arguments.");
563                 return 1;
564         }
565
566         for (i = 0; i < ci->values_num; ++i) {
567                 if (OCONFIG_TYPE_STRING != ci->values[i].type) {
568                         log_err ("Column expects either one or two string arguments.");
569                         return 1;
570                 }
571         }
572
573         ++query->cols_num;
574         if (NULL == (query->cols = (c_psql_col_t *)realloc (query->cols,
575                                 query->cols_num * sizeof (*query->cols)))) {
576                 log_err ("Out of memory.");
577                 exit (5);
578         }
579
580         col = query->cols + query->cols_num - 1;
581
582         col->ds_type = -1;
583
584         col->type = sstrdup (ci->values[0].value.string);
585         col->type_instance = (2 == ci->values_num)
586                 ? sstrdup (ci->values[1].value.string) : NULL;
587         return 0;
588 } /* config_set_column */
589
590 static int config_set_query (c_psql_database_t *db, const oconfig_item_t *ci)
591 {
592         c_psql_query_t *query;
593
594         if ((0 != ci->children_num) || (1 != ci->values_num)
595                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
596                 log_err ("Query expects a single string argument.");
597                 return 1;
598         }
599
600         query = c_psql_query_get (ci->values[0].value.string);
601         if (NULL == query) {
602                 log_err ("Query \"%s\" not found - please check your configuration.",
603                                 ci->values[0].value.string);
604                 return 1;
605         }
606
607         ++db->queries_num;
608         if (NULL == (db->queries = (c_psql_query_t **)realloc (db->queries,
609                                 db->queries_num * sizeof (*db->queries)))) {
610                 log_err ("Out of memory.");
611                 exit (5);
612         }
613
614         db->queries[db->queries_num - 1] = query;
615         return 0;
616 } /* config_set_query */
617
618 static int c_psql_config_query (oconfig_item_t *ci)
619 {
620         c_psql_query_t *query;
621
622         int i;
623
624         if ((1 != ci->values_num)
625                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
626                 log_err ("<Query> expects a single string argument.");
627                 return 1;
628         }
629
630         query = c_psql_query_new (ci->values[0].value.string);
631
632         for (i = 0; i < ci->children_num; ++i) {
633                 oconfig_item_t *c = ci->children + i;
634
635                 if (0 == strcasecmp (c->key, "Query"))
636                         config_set ("Query", &query->query, c);
637                 else if (0 == strcasecmp (c->key, "Column"))
638                         config_set_column (query, c);
639                 else
640                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
641         }
642         return 0;
643 } /* c_psql_config_query */
644
645 static int c_psql_config_database (oconfig_item_t *ci)
646 {
647         c_psql_database_t *db;
648
649         int i;
650
651         if ((1 != ci->values_num)
652                         || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
653                 log_err ("<Database> expects a single string argument.");
654                 return 1;
655         }
656
657         db = c_psql_database_new (ci->values[0].value.string);
658
659         for (i = 0; i < ci->children_num; ++i) {
660                 oconfig_item_t *c = ci->children + i;
661
662                 if (0 == strcasecmp (c->key, "Host"))
663                         config_set ("Host", &db->host, c);
664                 else if (0 == strcasecmp (c->key, "Port"))
665                         config_set ("Port", &db->port, c);
666                 else if (0 == strcasecmp (c->key, "User"))
667                         config_set ("User", &db->user, c);
668                 else if (0 == strcasecmp (c->key, "Password"))
669                         config_set ("Password", &db->password, c);
670                 else if (0 == strcasecmp (c->key, "SSLMode"))
671                         config_set ("SSLMode", &db->sslmode, c);
672                 else if (0 == strcasecmp (c->key, "KRBSrvName"))
673                         config_set ("KRBSrvName", &db->krbsrvname, c);
674                 else if (0 == strcasecmp (c->key, "Service"))
675                         config_set ("Service", &db->service, c);
676                 else if (0 == strcasecmp (c->key, "Query"))
677                         config_set_query (db, c);
678                 else
679                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
680         }
681
682         if (NULL == db->queries) {
683                 db->queries = (c_psql_query_t **)malloc (def_queries_num
684                                 * sizeof (*db->queries));
685
686                 for (i = 0; i < def_queries_num; ++i) {
687                         db->queries[i] = c_psql_query_get (def_queries[i]);
688                         if (NULL == db->queries[i])
689                                 log_err ("Query \"%s\" not found - "
690                                                 "please check your installation.",
691                                                 def_queries[i]);
692                         else
693                                 ++db->queries_num;
694                 }
695         }
696         return 0;
697 }
698
699 static int c_psql_config (oconfig_item_t *ci)
700 {
701         static int have_def_config = 0;
702
703         int i;
704
705         if (0 == have_def_config) {
706                 oconfig_item_t *c;
707
708                 have_def_config = 1;
709
710                 c = oconfig_parse_file (C_PSQL_DEFAULT_CONF);
711                 if (NULL == c)
712                         log_err ("Failed to read default config ("C_PSQL_DEFAULT_CONF").");
713                 else
714                         c_psql_config (c);
715
716                 if (NULL == queries)
717                         log_err ("Default config ("C_PSQL_DEFAULT_CONF") did not define "
718                                         "any queries - please check your installation.");
719         }
720
721         for (i = 0; i < ci->children_num; ++i) {
722                 oconfig_item_t *c = ci->children + i;
723
724                 if (0 == strcasecmp (c->key, "Query"))
725                         c_psql_config_query (c);
726                 else if (0 == strcasecmp (c->key, "Database"))
727                         c_psql_config_database (c);
728                 else
729                         log_warn ("Ignoring unknown config key \"%s\".", c->key);
730         }
731         return 0;
732 } /* c_psql_config */
733
734 void module_register (void)
735 {
736         plugin_register_complex_config ("postgresql", c_psql_config);
737         plugin_register_init ("postgresql", c_psql_init);
738 } /* module_register */
739
740 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */
741