X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fpostgresql.c;h=e46a47110ad7978d5154796aaed4d7b0dbefea2c;hb=6518cae7c97b5bb1ac3e1d1eca841fe41b5cb656;hp=9f4894ca83b0f206a5901f441ce05d44e2c3f4d4;hpb=27b225794cf5774bc1c34ad431bbabd2cc3971b9;p=collectd.git diff --git a/src/postgresql.c b/src/postgresql.c index 9f4894ca..e46a4711 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -101,6 +101,7 @@ typedef enum { C_PSQL_PARAM_DB, C_PSQL_PARAM_USER, C_PSQL_PARAM_INTERVAL, + C_PSQL_PARAM_INSTANCE, } c_psql_param_t; /* Parameter configuration. Stored as `user data' in the query objects. */ @@ -147,6 +148,8 @@ typedef struct { char *user; char *password; + char *instance; + char *sslmode; char *krbsrvname; @@ -167,14 +170,14 @@ static char *def_queries[] = { }; static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); -static c_psql_database_t *databases = NULL; -static size_t databases_num = 0; +static c_psql_database_t **databases = NULL; +static size_t databases_num = 0; -static udb_query_t **queries = NULL; -static size_t queries_num = 0; +static udb_query_t **queries = NULL; +static size_t queries_num = 0; -static c_psql_writer_t *writers = NULL; -static size_t writers_num = 0; +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; static int c_psql_begin (c_psql_database_t *db) { @@ -203,7 +206,7 @@ static int c_psql_commit (c_psql_database_t *db) if (r != NULL) { if (PGRES_COMMAND_OK == PQresultStatus (r)) { - db->next_commit = cdtime () + db->commit_interval; + db->next_commit = 0; log_debug ("Successfully committed transaction."); status = 0; } @@ -217,17 +220,25 @@ static int c_psql_commit (c_psql_database_t *db) static c_psql_database_t *c_psql_database_new (const char *name) { - c_psql_database_t *db; + c_psql_database_t **tmp; + c_psql_database_t *db; - db = (c_psql_database_t *)realloc (databases, - (databases_num + 1) * sizeof (*db)); + db = (c_psql_database_t *)malloc (sizeof(*db)); if (NULL == db) { log_err ("Out of memory."); return NULL; } - databases = db; - db = databases + databases_num; + tmp = (c_psql_database_t **)realloc (databases, + (databases_num + 1) * sizeof (*databases)); + if (NULL == tmp) { + log_err ("Out of memory."); + sfree (db); + return NULL; + } + + databases = tmp; + databases[databases_num] = db; ++databases_num; db->conn = NULL; @@ -259,6 +270,8 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->user = NULL; db->password = NULL; + db->instance = sstrdup (name); + db->sslmode = NULL; db->krbsrvname = NULL; @@ -310,6 +323,8 @@ static void c_psql_database_delete (void *data) sfree (db->user); sfree (db->password); + sfree (db->instance); + sfree (db->sslmode); sfree (db->krbsrvname); @@ -317,7 +332,9 @@ static void c_psql_database_delete (void *data) sfree (db->service); /* don't care about freeing or reordering the 'databases' array - * this is done in 'shutdown' */ + * this is done in 'shutdown'; also, don't free the database instance + * object just to make sure that in case anybody accesses it before + * shutdown won't segfault */ return; } /* c_psql_database_delete */ @@ -364,9 +381,6 @@ static int c_psql_check_connection (c_psql_database_t *db) c_psql_connect (db); } - /* "ping" */ - PQclear (PQexec (db->conn, "SELECT 42;")); - if (CONNECTION_OK != PQstatus (db->conn)) { PQreset (db->conn); @@ -376,8 +390,9 @@ static int c_psql_check_connection (c_psql_database_t *db) if (CONNECTION_OK != PQstatus (db->conn)) { c_complain (LOG_ERR, &db->conn_complaint, - "Failed to connect to database %s: %s", - db->database, PQerrorMessage (db->conn)); + "Failed to connect to database %s (%s): %s", + db->database, db->instance, + PQerrorMessage (db->conn)); return -1; } @@ -442,9 +457,13 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, case C_PSQL_PARAM_INTERVAL: ssnprintf (interval, sizeof (interval), "%.3f", (db->interval > 0) - ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g); + ? CDTIME_T_TO_DOUBLE (db->interval) + : plugin_get_interval ()); params[i] = interval; break; + case C_PSQL_PARAM_INSTANCE: + params[i] = db->instance; + break; default: assert (0); } @@ -483,9 +502,10 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, else if ((NULL == data) || (0 == data->params_num)) res = c_psql_exec_query_noparams (db, q); else { - log_err ("Connection to database \"%s\" does not support parameters " - "(protocol version %d) - cannot execute query \"%s\".", - db->database, db->proto_version, + log_err ("Connection to database \"%s\" (%s) does not support " + "parameters (protocol version %d) - " + "cannot execute query \"%s\".", + db->database, db->instance, db->proto_version, udb_query_get_name (q)); return -1; } @@ -501,6 +521,12 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, if (PGRES_TUPLES_OK != PQresultStatus (res)) { pthread_mutex_lock (&db->db_lock); + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + PQclear (res); + return c_psql_exec_query (db, q, prep_area); + } + log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); log_info ("SQL query was: %s", @@ -551,7 +577,7 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, host = db->host; status = udb_query_prepare_result (q, prep_area, host, "postgresql", - db->database, column_names, (size_t) column_num, db->interval); + db->instance, column_names, (size_t) column_num, db->interval); if (0 != status) { log_err ("udb_query_prepare_result failed with status %i.", status); @@ -602,6 +628,7 @@ static int c_psql_read (user_data_t *ud) db = ud->data; assert (NULL != db->database); + assert (NULL != db->instance); assert (NULL != db->queries); pthread_mutex_lock (&db->db_lock); @@ -748,7 +775,7 @@ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, if (ds->ds[i].type == DS_TYPE_GAUGE) status = ssnprintf (str_ptr, str_len, - ",%f", vl->values[i].gauge); + ","GAUGE_FORMAT, vl->values[i].gauge); else if (store_rates) { if (rates == NULL) rates = uc_get_rate (ds, vl); @@ -886,10 +913,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK != PQresultStatus (res)) && (PGRES_TUPLES_OK != PQresultStatus (res))) { + PQclear (res); + if ((CONNECTION_OK != PQstatus (db->conn)) && (0 == c_psql_check_connection (db))) { - PQclear (res); - /* try again */ res = PQexecParams (db->conn, writer->statement, STATIC_ARRAY_SIZE (params), NULL, @@ -898,6 +925,7 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK == PQresultStatus (res)) || (PGRES_TUPLES_OK == PQresultStatus (res))) { + PQclear (res); success = 1; continue; } @@ -913,19 +941,19 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, /* this will abort any current transaction -> restart */ if (db->next_commit > 0) - if (c_psql_commit (db) == 0) - c_psql_begin (db); + c_psql_commit (db); pthread_mutex_unlock (&db->db_lock); return -1; } + + PQclear (res); success = 1; } if ((db->next_commit > 0) && (cdtime () > db->next_commit)) - if (c_psql_commit (db) == 0) - c_psql_begin (db); + c_psql_commit (db); pthread_mutex_unlock (&db->db_lock); @@ -941,24 +969,23 @@ static int c_psql_flush (cdtime_t timeout, __attribute__((unused)) const char *ident, user_data_t *ud) { - c_psql_database_t *dbs = databases; + c_psql_database_t **dbs = databases; size_t dbs_num = databases_num; size_t i; if ((ud != NULL) && (ud->data != NULL)) { - dbs = ud->data; + dbs = (void *)&ud->data; dbs_num = 1; } for (i = 0; i < dbs_num; ++i) { - c_psql_database_t *db = dbs + i; + c_psql_database_t *db = dbs[i]; /* don't commit if the timeout is larger than the regular commit * interval as in that case all requested data has already been * committed */ if ((db->next_commit > 0) && (db->commit_interval > timeout)) - if (c_psql_commit (db) == 0) - c_psql_begin (db); + c_psql_commit (db); } return 0; } /* c_psql_flush */ @@ -972,7 +999,7 @@ static int c_psql_shutdown (void) plugin_unregister_read_group ("postgresql"); for (i = 0; i < databases_num; ++i) { - c_psql_database_t *db = databases + i; + c_psql_database_t *db = databases[i]; if (db->writers_num > 0) { char cb_name[DATA_MAX_NAME_LEN]; @@ -987,6 +1014,8 @@ static int c_psql_shutdown (void) plugin_unregister_flush (cb_name); plugin_unregister_write (cb_name); } + + sfree (db); } udb_query_free (queries, queries_num); @@ -1013,17 +1042,19 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) data = udb_query_get_user_data (q); if (NULL == data) { - data = (c_psql_user_data_t *) smalloc (sizeof (*data)); + data = malloc (sizeof (*data)); if (NULL == data) { log_err ("Out of memory."); return -1; } memset (data, 0, sizeof (*data)); data->params = NULL; + data->params_num = 0; + + udb_query_set_user_data (q, data); } - tmp = (c_psql_param_t *) realloc (data->params, - (data->params_num + 1) * sizeof (c_psql_param_t)); + tmp = realloc (data->params, (data->params_num + 1) * sizeof (*data->params)); if (NULL == tmp) { log_err ("Out of memory."); return -1; @@ -1039,14 +1070,14 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) data->params[data->params_num] = C_PSQL_PARAM_USER; else if (0 == strcasecmp (param_str, "interval")) data->params[data->params_num] = C_PSQL_PARAM_INTERVAL; + else if (0 == strcasecmp (param_str, "instance")) + data->params[data->params_num] = C_PSQL_PARAM_INSTANCE; else { log_err ("Invalid parameter \"%s\".", param_str); return 1; } data->params_num++; - udb_query_set_user_data (q, data); - return (0); } /* config_query_param_add */ @@ -1130,7 +1161,7 @@ static int c_psql_config_writer (oconfig_item_t *ci) writers = tmp; writer = writers + writers_num; - ++writers_num; + memset (writer, 0, sizeof (*writer)); writer->name = sstrdup (ci->values[0].value.string); writer->statement = NULL; @@ -1150,10 +1181,10 @@ static int c_psql_config_writer (oconfig_item_t *ci) if (status != 0) { sfree (writer->statement); sfree (writer->name); - sfree (writer); return status; } + ++writers_num; return 0; } /* c_psql_config_writer */ @@ -1192,6 +1223,8 @@ static int c_psql_config_database (oconfig_item_t *ci) cf_util_get_string (c, &db->user); else if (0 == strcasecmp (c->key, "Password")) cf_util_get_string (c, &db->password); + else if (0 == strcasecmp (c->key, "Instance")) + cf_util_get_string (c, &db->instance); else if (0 == strcasecmp (c->key, "SSLMode")) cf_util_get_string (c, &db->sslmode); else if (0 == strcasecmp (c->key, "KRBSrvName")) @@ -1250,7 +1283,7 @@ static int c_psql_config_database (oconfig_item_t *ci) ud.data = db; ud.free_func = c_psql_database_delete; - ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database); + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance); if (db->queries_num > 0) { CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);