From: Florian Forster Date: Sat, 17 Nov 2012 08:39:33 +0000 (+0100) Subject: Merge branch 'sh/postgresql-writer' X-Git-Tag: collectd-5.2.0~13 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=5131a49ad0584aa22282aacf72b6e4ca75356bae;hp=d6d42818a8794a756727a6bb56e3680c7eefd6f9;p=collectd.git Merge branch 'sh/postgresql-writer' Conflicts: src/collectd.conf.pod src/postgresql.c --- diff --git a/contrib/postgresql/collectd_insert.sql b/contrib/postgresql/collectd_insert.sql new file mode 100644 index 00000000..fb44bb4f --- /dev/null +++ b/contrib/postgresql/collectd_insert.sql @@ -0,0 +1,234 @@ +-- collectd - contrib/postgresql/collectd_insert.sql +-- Copyright (C) 2012 Sebastian 'tokkee' Harl +-- All rights reserved. +-- +-- Redistribution and use in source and binary forms, with or without +-- modification, are permitted provided that the following conditions +-- are met: +-- +-- - Redistributions of source code must retain the above copyright +-- notice, this list of conditions and the following disclaimer. +-- +-- - Redistributions in binary form must reproduce the above copyright +-- notice, this list of conditions and the following disclaimer in the +-- documentation and/or other materials provided with the distribution. +-- +-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +-- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +-- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +-- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +-- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +-- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +-- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +-- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +-- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +-- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +-- POSSIBILITY OF SUCH DAMAGE. + +-- Description: +-- ------------ +-- +-- This is a sample database setup that may be used to write data collected by +-- collectd to a PostgreSQL database. We're using two tables, 'identifiers' +-- and 'values' to store the value-list identifier and the actual values +-- respectively. +-- +-- The 'values' table is partitioned to improve performance and maintainance. +-- Please note that additional maintainance scripts are required in order to +-- keep the setup running -- see the comments below for details. +-- +-- The function 'collectd_insert' may be used to actually insert values +-- submitted by collectd into those tables. +-- +-- Sample configuration: +-- --------------------- +-- +-- +-- +-- Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);" +-- +-- +-- # ... +-- Writer sqlstore +-- +-- + +CREATE TABLE identifiers ( + id integer NOT NULL, + host character varying(64) NOT NULL, + plugin character varying(64) NOT NULL, + plugin_inst character varying(64) DEFAULT NULL::character varying, + type character varying(64) NOT NULL, + type_inst character varying(64) DEFAULT NULL::character varying +); +CREATE SEQUENCE identifiers_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; +ALTER SEQUENCE identifiers_id_seq OWNED BY identifiers.id; +ALTER TABLE ONLY identifiers + ALTER COLUMN id SET DEFAULT nextval('identifiers_id_seq'::regclass); +ALTER TABLE ONLY identifiers + ADD CONSTRAINT identifiers_host_plugin_plugin_inst_type_type_inst_key + UNIQUE (host, plugin, plugin_inst, type, type_inst); +ALTER TABLE ONLY identifiers + ADD CONSTRAINT identifiers_pkey PRIMARY KEY (id); + +-- optionally, create indexes for the identifier fields +CREATE INDEX identifiers_host ON identifiers USING btree (host); +CREATE INDEX identifiers_plugin ON identifiers USING btree (plugin); +CREATE INDEX identifiers_plugin_inst ON identifiers USING btree (plugin_inst); +CREATE INDEX identifiers_type ON identifiers USING btree (type); +CREATE INDEX identifiers_type_inst ON identifiers USING btree (type_inst); + +CREATE TABLE "values" ( + id integer NOT NULL, + tstamp timestamp with time zone NOT NULL, + name character varying(64) NOT NULL, + value double precision NOT NULL +); + +CREATE OR REPLACE VIEW collectd + AS SELECT host, plugin, plugin_inst, type, type_inst, + host + || '/' || plugin + || CASE + WHEN plugin_inst IS NOT NULL THEN '-' + ELSE '' + END + || coalesce(plugin_inst, '') + || '/' || type + || CASE + WHEN type_inst IS NOT NULL THEN '-' + ELSE '' + END + || coalesce(plugin_inst, '') AS identifier, + tstamp, name, value + FROM identifiers + JOIN values + ON values.id = identifiers.id; + +-- partition "values" by day (or week, month, ...) + +-- create the child tables for today and the next 'days' days: +-- this may, for example, be used in a daily cron-job (or similar) to create +-- the tables for the next couple of days +CREATE OR REPLACE FUNCTION values_update_childs( + integer + ) RETURNS SETOF text + LANGUAGE plpgsql + AS $_$ +DECLARE + days alias for $1; + cur_day date; + next_day date; + i integer; +BEGIN + IF days < 1 THEN + RAISE EXCEPTION 'Cannot have negative number of days'; + END IF; + + i := 0; + LOOP + EXIT WHEN i > days; + + SELECT CAST ('now'::date + i * '1day'::interval AS date) INTO cur_day; + SELECT CAST ('now'::date + (i + 1) * '1day'::interval AS date) INTO next_day; + + i := i + 1; + + BEGIN + EXECUTE 'CREATE TABLE "values$' || cur_day || '" ( + CHECK (tstamp >= TIMESTAMP ''' || cur_day || ''' ' + || 'AND tstamp < TIMESTAMP ''' || next_day || ''') + ) INHERITS (values)'; + EXCEPTION WHEN duplicate_table THEN + CONTINUE; + END; + + RETURN NEXT 'values$' || cur_day::text; + + EXECUTE 'ALTER TABLE ONLY "values$' || cur_day || '" + ADD CONSTRAINT "values_' || cur_day || '_pkey" + PRIMARY KEY (id, tstamp, name, value)'; + EXECUTE 'ALTER TABLE ONLY "values$' || cur_day || '" + ADD CONSTRAINT "values_' || cur_day || '_id_fkey" + FOREIGN KEY (id) REFERENCES identifiers(id)'; + END LOOP; + RETURN; +END; +$_$; + +-- create initial child tables +SELECT values_update_childs(2); + +CREATE OR REPLACE FUNCTION values_insert_trigger() + RETURNS trigger + LANGUAGE plpgsql + AS $_$ +DECLARE + child_tbl character varying; +BEGIN + SELECT 'values$' || CAST (NEW.tstamp AS DATE) INTO child_tbl; + -- Rather than using 'EXECUTE', some if-cascade checking the date may also + -- be used. However, this would require frequent updates of the trigger + -- function while this example works automatically. + EXECUTE 'INSERT INTO "' || child_tbl || '" VALUES ($1.*)' USING NEW; + RETURN NULL; +END; +$_$; + +CREATE TRIGGER insert_values_trigger + BEFORE INSERT ON values + FOR EACH ROW EXECUTE PROCEDURE values_insert_trigger(); + +-- when querying values make sure to enable constraint exclusion +-- SET constraint_exclusion = on; + +CREATE OR REPLACE FUNCTION collectd_insert( + timestamp with time zone, character varying, + character varying, character varying, + character varying, character varying, + character varying[], character varying[], double precision[] + ) RETURNS void + LANGUAGE plpgsql + AS $_$ +DECLARE + p_time alias for $1; + p_host alias for $2; + p_plugin alias for $3; + p_plugin_instance alias for $4; + p_type alias for $5; + p_type_instance alias for $6; + p_value_names alias for $7; + -- don't use the type info; for 'StoreRates true' it's 'gauge' anyway + -- p_type_names alias for $8; + p_values alias for $9; + ds_id integer; + i integer; +BEGIN + SELECT id INTO ds_id + FROM identifiers + WHERE host = p_host + AND plugin = p_plugin + AND COALESCE(plugin_inst, '') = COALESCE(p_plugin_instance, '') + AND type = p_type + AND COALESCE(type_inst, '') = COALESCE(p_type_instance, ''); + IF NOT FOUND THEN + INSERT INTO identifiers (host, plugin, plugin_inst, type, type_inst) + VALUES (p_host, p_plugin, p_plugin_instance, p_type, p_type_instance) + RETURNING id INTO ds_id; + END IF; + i := 1; + LOOP + EXIT WHEN i > array_upper(p_value_names, 1); + INSERT INTO values (id, tstamp, name, value) + VALUES (ds_id, p_time, p_value_names[i], p_values[i]); + i := i + 1; + END LOOP; +END; +$_$; + +-- vim: set expandtab : diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 9d098f12..417af0d7 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -3627,6 +3627,13 @@ which are available in a PostgreSQL database or use future or special statistics provided by PostgreSQL without the need to upgrade your collectd installation. +Starting with version 5.2, the C plugin supports writing data to +PostgreSQL databases as well. This has been implemented in a generic way. You +need to specify an SQL statement which will then be executed by collectd in +order to write the data (see below for details). The benefit of that approach +is that there is no fixed database layout. Rather, the layout may be optimized +for the current setup. + The B manual can be found at L. @@ -3656,6 +3663,11 @@ L. + + Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);" + StoreRates true + + Host "hostname" Port "5432" @@ -3672,6 +3684,12 @@ L. Query backend # predefined Query rt36_tickets + + + # ... + Writer sqlstore + CommitInterval 10 + The B block defines one database query which may later be used by a @@ -3855,6 +3873,84 @@ non-by_table queries above. =back +The B block defines a PostgreSQL writer backend. It accepts a single +mandatory argument specifying the name of the writer. This will then be used +in the B specification in order to activate the writer instance. The +names of all writers have to be unique. The following options may be +specified: + +=over 4 + +=item B I + +This mandatory option specifies the SQL statement that will be executed for +each submitted value. A single SQL statement is allowed only. Anything after +the first semicolon will be ignored. + +Nine parameters will be passed to the statement and should be specified as +tokens B<$1>, B<$2>, through B<$9> in the statement string. The following +values are made available through those parameters: + +=over 4 + +=item B<$1> + +The timestamp of the queried value as a floating point number. + +=item B<$2> + +The hostname of the queried value. + +=item B<$3> + +The plugin name of the queried value. + +=item B<$4> + +The plugin instance of the queried value. This value may be B if there +is no plugin instance. + +=item B<$5> + +The type of the queried value (cf. L). + +=item B<$6> + +The type instance of the queried value. This value may be B if there is +no type instance. + +=item B<$7> + +An array of names for the submitted values (i.Ee., the name of the data +sources of the submitted value-list). + +=item B<$8> + +An array of types for the submitted values (i.Ee., the type of the data +sources of the submitted value-list; C, C, ...). Note, that if +B is enabled (which is the default, see below), all types will be +C. + +=item B<$9> + +An array of the submitted values. The dimensions of the value name and value +arrays match. + +=back + +In general, it is advisable to create and call a custom function in the +PostgreSQL database for this purpose. Any procedural language supported by +PostgreSQL will do (see chapter "Server Programming" in the PostgreSQL manual +for details). + +=item B B|B + +If set to B (the default), convert counter values to rates. If set to +B counter values are stored as is, i.Ee. as an increasing integer +number. + +=back + The B block defines one PostgreSQL database for which to collect statistics. It accepts a single mandatory argument which specifies the database name. None of the other options are required. PostgreSQL will use @@ -3870,6 +3966,17 @@ for details. Specify the interval with which the database should be queried. The default is to use the global B setting. +=item B I + +This option may be used for database connections which have "writers" assigned +(see above). If specified, it causes a writer to put several updates into a +single transaction. This transaction will last for the specified amount of +time. By default, each update will be executed in a separate transaction. Each +transaction generates a fair amount of overhead which can, thus, be reduced by +activating this option. The draw-back is, that data covering the specified +amount of time will be lost, for example, if a single statement within the +transaction fails or if the database server crashes. + =item B I Specify the hostname or IP of the PostgreSQL server to connect to. If the @@ -3942,11 +4049,36 @@ B for details. =item B I -Specify a I which should be executed for the database connection. This -may be any of the predefined or user-defined queries. If no such option is -given, it defaults to "backends", "transactions", "queries", "query_plans", -"table_states", "disk_io" and "disk_usage". Else, the specified queries are -used only. +Specifies a I which should be executed in the context of the database +connection. This may be any of the predefined or user-defined queries. If no +such option is given, it defaults to "backends", "transactions", "queries", +"query_plans", "table_states", "disk_io" and "disk_usage" (unless a B +has been specified). Else, the specified queries are used only. + +=item B I + +Assigns the specified I backend to the database connection. This +causes all collected data to be send to the database using the settings +defined in the writer configuration (see the section "FILTER CONFIGURATION" +below for details on how to selectively send data to certain plugins). + +Each writer will register a flush callback which may be used when having long +transactions enabled (see the B option above). When issuing +the B command (see L for details) the current +transaction will be committed right away. Two different kinds of flush +callbacks are available with the C plugin: + +=over 4 + +=item B + +Flush all writer backends. + +=item BI + +Flush all writers of the specified I only. + +=back =back diff --git a/src/postgresql.c b/src/postgresql.c index a72109ae..15d4666b 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -1,7 +1,7 @@ /** * collectd - src/postgresql.c - * Copyright (C) 2008, 2009 Sebastian Harl - * Copyright (C) 2009 Florian Forster + * Copyright (C) 2008-2012 Sebastian Harl + * Copyright (C) 2009 Florian Forster * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -42,15 +42,21 @@ #include "configfile.h" #include "plugin.h" +#include "utils_cache.h" #include "utils_db_query.h" #include "utils_complain.h" +#if HAVE_PTHREAD_H +# include +#endif + #include #include #define log_err(...) ERROR ("postgresql: " __VA_ARGS__) #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__) #define log_info(...) INFO ("postgresql: " __VA_ARGS__) +#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__) #ifndef C_PSQL_DEFAULT_CONF # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf" @@ -105,6 +111,12 @@ typedef struct { } c_psql_user_data_t; typedef struct { + char *name; + char *statement; + _Bool store_rates; +} c_psql_writer_t; + +typedef struct { PGconn *conn; c_complain_t conn_complaint; @@ -118,8 +130,18 @@ typedef struct { udb_query_t **queries; size_t queries_num; + c_psql_writer_t **writers; + size_t writers_num; + + /* make sure we don't access the database object in parallel */ + pthread_mutex_t db_lock; + cdtime_t interval; + /* writer "caching" settings */ + cdtime_t commit_interval; + cdtime_t next_commit; + char *host; char *port; char *database; @@ -133,6 +155,8 @@ typedef struct { char *krbsrvname; char *service; + + int ref_cnt; } c_psql_database_t; static char *def_queries[] = { @@ -146,19 +170,69 @@ static char *def_queries[] = { }; static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); +static c_psql_database_t *databases = NULL; +static size_t databases_num = 0; + static udb_query_t **queries = NULL; static size_t queries_num = 0; +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; + +static int c_psql_begin (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "BEGIN"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = cdtime() + db->commit_interval; + status = 0; + } + else + log_warn ("Failed to initiate ('BEGIN') transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_begin */ + +static int c_psql_commit (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "COMMIT"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = 0; + log_debug ("Successfully committed transaction."); + status = 0; + } + else + log_warn ("Failed to commit transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_commit */ + static c_psql_database_t *c_psql_database_new (const char *name) { c_psql_database_t *db; - db = (c_psql_database_t *)malloc (sizeof (*db)); + db = (c_psql_database_t *)realloc (databases, + (databases_num + 1) * sizeof (*db)); if (NULL == db) { log_err ("Out of memory."); return NULL; } + databases = db; + db = databases + databases_num; + ++databases_num; + db->conn = NULL; C_COMPLAIN_INIT (&db->conn_complaint); @@ -172,8 +246,16 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->queries = NULL; db->queries_num = 0; + db->writers = NULL; + db->writers_num = 0; + + pthread_mutex_init (&db->db_lock, /* attrs = */ NULL); + db->interval = 0; + db->commit_interval = 0; + db->next_commit = 0; + db->database = sstrdup (name); db->host = NULL; db->port = NULL; @@ -187,6 +269,8 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->krbsrvname = NULL; db->service = NULL; + + db->ref_cnt = 0; return db; } /* c_psql_database_new */ @@ -196,6 +280,17 @@ static void c_psql_database_delete (void *data) c_psql_database_t *db = data; + --db->ref_cnt; + /* readers and writers may access this database */ + if (db->ref_cnt > 0) + return; + + /* wait for the lock to be released by the last writer */ + pthread_mutex_lock (&db->db_lock); + + if (db->next_commit > 0) + c_psql_commit (db); + PQfinish (db->conn); db->conn = NULL; @@ -207,6 +302,13 @@ static void c_psql_database_delete (void *data) sfree (db->queries); db->queries_num = 0; + sfree (db->writers); + db->writers_num = 0; + + pthread_mutex_unlock (&db->db_lock); + + pthread_mutex_destroy (&db->db_lock); + sfree (db->database); sfree (db->host); sfree (db->port); @@ -220,6 +322,9 @@ static void c_psql_database_delete (void *data) sfree (db->krbsrvname); sfree (db->service); + + /* don't care about freeing or reordering the 'databases' array + * this is done in 'shutdown' */ return; } /* c_psql_database_delete */ @@ -230,7 +335,7 @@ static int c_psql_connect (c_psql_database_t *db) int buf_len = sizeof (conninfo); int status; - if (! db) + if ((! db) || (! db->database)) return -1; status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database); @@ -363,6 +468,7 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, NULL, NULL, /* return text data */ 0); } /* c_psql_exec_query_params */ +/* db->db_lock must be locked when calling this function */ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, udb_query_preparation_area_t *prep_area) { @@ -397,23 +503,32 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, return -1; } + /* give c_psql_write() a chance to acquire the lock if called recursively + * through dispatch_values(); this will happen if, both, queries and + * writers are configured for a single connection */ + pthread_mutex_unlock (&db->db_lock); + column_names = NULL; column_values = NULL; -#define BAIL_OUT(status) \ - sfree (column_names); \ - sfree (column_values); \ - PQclear (res); \ - return status - if (PGRES_TUPLES_OK != PQresultStatus (res)) { + pthread_mutex_lock (&db->db_lock); + log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); log_info ("SQL query was: %s", udb_query_get_statement (q)); - BAIL_OUT (-1); + PQclear (res); + return -1; } +#define BAIL_OUT(status) \ + sfree (column_names); \ + sfree (column_values); \ + PQclear (res); \ + pthread_mutex_lock (&db->db_lock); \ + return status + rows_num = PQntuples (res); if (1 > rows_num) { BAIL_OUT (0); @@ -501,9 +616,14 @@ static int c_psql_read (user_data_t *ud) assert (NULL != db->database); assert (NULL != db->instance); + assert (NULL != db->queries); + + pthread_mutex_lock (&db->db_lock); - if (0 != c_psql_check_connection (db)) + if (0 != c_psql_check_connection (db)) { + pthread_mutex_unlock (&db->db_lock); return -1; + } for (i = 0; i < db->queries_num; ++i) { @@ -521,19 +641,377 @@ static int c_psql_read (user_data_t *ud) success = 1; } + pthread_mutex_unlock (&db->db_lock); + if (! success) return -1; return 0; } /* c_psql_read */ +static char *values_name_to_sqlarray (const data_set_t *ds, + char *string, size_t string_len) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < ds->ds_num; ++i) { + int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name); + + if (status < 1) + return NULL; + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value names"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_name_to_sqlarray */ + +static char *values_type_to_sqlarray (const data_set_t *ds, + char *string, size_t string_len, _Bool store_rates) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < ds->ds_num; ++i) { + int status; + + if (store_rates) + status = ssnprintf(str_ptr, str_len, ",'gauge'"); + else + status = ssnprintf(str_ptr, str_len, ",'%s'", + DS_TYPE_TO_STRING (ds->ds[i].type)); + + if (status < 1) { + str_len = 0; + break; + } + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value types"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_type_to_sqlarray */ + +static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, + char *string, size_t string_len, _Bool store_rates) +{ + char *str_ptr; + size_t str_len; + + gauge_t *rates = NULL; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < vl->values_len; ++i) { + int status = 0; + + if ((ds->ds[i].type != DS_TYPE_GAUGE) + && (ds->ds[i].type != DS_TYPE_COUNTER) + && (ds->ds[i].type != DS_TYPE_DERIVE) + && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { + log_err ("c_psql_write: Unknown data source type: %i", + ds->ds[i].type); + sfree (rates); + return NULL; + } + + if (ds->ds[i].type == DS_TYPE_GAUGE) + status = ssnprintf (str_ptr, str_len, + ",%f", vl->values[i].gauge); + else if (store_rates) { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + + if (rates == NULL) { + log_err ("c_psql_write: Failed to determine rate"); + return NULL; + } + + status = ssnprintf (str_ptr, str_len, + ",%lf", rates[i]); + } + else if (ds->ds[i].type == DS_TYPE_COUNTER) + status = ssnprintf (str_ptr, str_len, + ",%llu", vl->values[i].counter); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + status = ssnprintf (str_ptr, str_len, + ",%"PRIi64, vl->values[i].derive); + else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) + status = ssnprintf (str_ptr, str_len, + ",%"PRIu64, vl->values[i].absolute); + + if (status < 1) { + str_len = 0; + break; + } + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + sfree (rates); + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value list"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_to_sqlarray */ + +static int c_psql_write (const data_set_t *ds, const value_list_t *vl, + user_data_t *ud) +{ + c_psql_database_t *db; + + char time_str[32]; + char values_name_str[1024]; + char values_type_str[1024]; + char values_str[1024]; + + const char *params[9]; + + int success = 0; + int i; + + if ((ud == NULL) || (ud->data == NULL)) { + log_err ("c_psql_write: Invalid user data."); + return -1; + } + + db = ud->data; + assert (db->database != NULL); + assert (db->writers != NULL); + + if (cdtime_to_iso8601 (time_str, sizeof (time_str), vl->time) == 0) { + log_err ("c_psql_write: Failed to convert time to ISO 8601 format"); + return -1; + } + + if (values_name_to_sqlarray (ds, + values_name_str, sizeof (values_name_str)) == NULL) + return -1; + +#define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v)) + + params[0] = time_str; + params[1] = vl->host; + params[2] = vl->plugin; + params[3] = VALUE_OR_NULL(vl->plugin_instance); + params[4] = vl->type; + params[5] = VALUE_OR_NULL(vl->type_instance); + params[6] = values_name_str; + +#undef VALUE_OR_NULL + + pthread_mutex_lock (&db->db_lock); + + if (0 != c_psql_check_connection (db)) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + if ((db->commit_interval > 0) + && (db->next_commit == 0)) + c_psql_begin (db); + + for (i = 0; i < db->writers_num; ++i) { + c_psql_writer_t *writer; + PGresult *res; + + writer = db->writers[i]; + + if (values_type_to_sqlarray (ds, + values_type_str, sizeof (values_type_str), + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + if (values_to_sqlarray (ds, vl, + values_str, sizeof (values_str), + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + params[7] = values_type_str; + params[8] = values_str; + + res = PQexecParams (db->conn, writer->statement, + STATIC_ARRAY_SIZE (params), NULL, + (const char *const *)params, + NULL, NULL, /* return text data */ 0); + + if ((PGRES_COMMAND_OK != PQresultStatus (res)) + && (PGRES_TUPLES_OK != PQresultStatus (res))) { + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + PQclear (res); + + /* try again */ + res = PQexecParams (db->conn, writer->statement, + STATIC_ARRAY_SIZE (params), NULL, + (const char *const *)params, + NULL, NULL, /* return text data */ 0); + + if ((PGRES_COMMAND_OK == PQresultStatus (res)) + || (PGRES_TUPLES_OK == PQresultStatus (res))) { + success = 1; + continue; + } + } + + log_err ("Failed to execute SQL query: %s", + PQerrorMessage (db->conn)); + log_info ("SQL query was: '%s', " + "params: %s, %s, %s, %s, %s, %s, %s, %s", + writer->statement, + params[0], params[1], params[2], params[3], + params[4], params[5], params[6], params[7]); + + /* this will abort any current transaction -> restart */ + if (db->next_commit > 0) + c_psql_commit (db); + + pthread_mutex_unlock (&db->db_lock); + return -1; + } + success = 1; + } + + if ((db->next_commit > 0) + && (cdtime () > db->next_commit)) + c_psql_commit (db); + + pthread_mutex_unlock (&db->db_lock); + + if (! success) + return -1; + return 0; +} /* c_psql_write */ + +/* We cannot flush single identifiers as all we do is to commit the currently + * running transaction, thus making sure that all written data is actually + * visible to everybody. */ +static int c_psql_flush (cdtime_t timeout, + __attribute__((unused)) const char *ident, + user_data_t *ud) +{ + c_psql_database_t *dbs = databases; + size_t dbs_num = databases_num; + size_t i; + + if ((ud != NULL) && (ud->data != NULL)) { + dbs = ud->data; + dbs_num = 1; + } + + for (i = 0; i < dbs_num; ++i) { + c_psql_database_t *db = dbs + i; + + /* don't commit if the timeout is larger than the regular commit + * interval as in that case all requested data has already been + * committed */ + if ((db->next_commit > 0) && (db->commit_interval > timeout)) + c_psql_commit (db); + } + return 0; +} /* c_psql_flush */ + static int c_psql_shutdown (void) { + size_t i = 0; + + _Bool had_flush = 0; + plugin_unregister_read_group ("postgresql"); + for (i = 0; i < databases_num; ++i) { + c_psql_database_t *db = databases + i; + + if (db->writers_num > 0) { + char cb_name[DATA_MAX_NAME_LEN]; + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", + db->database); + + if (! had_flush) { + plugin_unregister_flush ("postgresql"); + had_flush = 1; + } + + plugin_unregister_flush (cb_name); + plugin_unregister_write (cb_name); + } + } + udb_query_free (queries, queries_num); queries = NULL; queries_num = 0; + sfree (writers); + writers = NULL; + writers_num = 0; + + sfree (databases); + databases = NULL; + databases_num = 0; + return 0; } /* c_psql_shutdown */ @@ -595,6 +1073,103 @@ static int config_query_callback (udb_query_t *q, oconfig_item_t *ci) return (-1); } /* config_query_callback */ +static int config_add_writer (oconfig_item_t *ci, + c_psql_writer_t *src_writers, size_t src_writers_num, + c_psql_writer_t ***dst_writers, size_t *dst_writers_num) +{ + char *name; + + size_t i; + + if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL)) + return -1; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { + log_err ("`Writer' expects a single string argument."); + return 1; + } + + name = ci->values[0].value.string; + + for (i = 0; i < src_writers_num; ++i) { + c_psql_writer_t **tmp; + + if (strcasecmp (name, src_writers[i].name) != 0) + continue; + + tmp = (c_psql_writer_t **)realloc (*dst_writers, + sizeof (**dst_writers) * (*dst_writers_num + 1)); + if (tmp == NULL) { + log_err ("Out of memory."); + return -1; + } + + tmp[*dst_writers_num] = src_writers + i; + + *dst_writers = tmp; + ++(*dst_writers_num); + break; + } + + if (i >= src_writers_num) { + log_err ("No such writer: `%s'", name); + return -1; + } + + return 0; +} /* config_add_writer */ + +static int c_psql_config_writer (oconfig_item_t *ci) +{ + c_psql_writer_t *writer; + c_psql_writer_t *tmp; + + int status = 0; + int i; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { + log_err (" expects a single string argument."); + return 1; + } + + tmp = (c_psql_writer_t *)realloc (writers, + sizeof (*writers) * (writers_num + 1)); + if (tmp == NULL) { + log_err ("Out of memory."); + return -1; + } + + writers = tmp; + writer = writers + writers_num; + ++writers_num; + + writer->name = sstrdup (ci->values[0].value.string); + writer->statement = NULL; + writer->store_rates = 1; + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *c = ci->children + i; + + if (strcasecmp ("Statement", c->key) == 0) + status = cf_util_get_string (c, &writer->statement); + else if (strcasecmp ("StoreRates", c->key) == 0) + status = cf_util_get_boolean (c, &writer->store_rates); + else + log_warn ("Ignoring unknown config key \"%s\".", c->key); + } + + if (status != 0) { + sfree (writer->statement); + sfree (writer->name); + sfree (writer); + return status; + } + + return 0; +} /* c_psql_config_writer */ + static int c_psql_config_database (oconfig_item_t *ci) { c_psql_database_t *db; @@ -603,6 +1178,8 @@ static int c_psql_config_database (oconfig_item_t *ci) struct timespec cb_interval = { 0, 0 }; user_data_t ud; + static _Bool have_flush = 0; + int i; if ((1 != ci->values_num) @@ -639,14 +1216,19 @@ static int c_psql_config_database (oconfig_item_t *ci) else if (0 == strcasecmp (c->key, "Query")) udb_query_pick_from_list (c, queries, queries_num, &db->queries, &db->queries_num); + else if (0 == strcasecmp (c->key, "Writer")) + config_add_writer (c, writers, writers_num, + &db->writers, &db->writers_num); else if (0 == strcasecmp (c->key, "Interval")) cf_util_get_cdtime (c, &db->interval); + else if (strcasecmp ("CommitInterval", c->key) == 0) + cf_util_get_cdtime (c, &db->commit_interval); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } /* If no `Query' options were given, add the default queries.. */ - if (db->queries_num == 0) { + if ((db->queries_num == 0) && (db->writers_num == 0)){ for (i = 0; i < def_queries_num; i++) udb_query_pick_from_list_by_name (def_queries[i], queries, queries_num, @@ -685,11 +1267,34 @@ static int c_psql_config_database (oconfig_item_t *ci) ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance); - CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); + if (db->queries_num > 0) { + CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); - plugin_register_complex_read ("postgresql", cb_name, c_psql_read, - /* interval = */ (db->interval > 0) ? &cb_interval : NULL, - &ud); + ++db->ref_cnt; + plugin_register_complex_read ("postgresql", cb_name, c_psql_read, + /* interval = */ (db->interval > 0) ? &cb_interval : NULL, + &ud); + } + if (db->writers_num > 0) { + ++db->ref_cnt; + plugin_register_write (cb_name, c_psql_write, &ud); + + if (! have_flush) { + /* flush all */ + plugin_register_flush ("postgresql", + c_psql_flush, /* user data = */ NULL); + have_flush = 1; + } + + /* flush this connection only */ + ++db->ref_cnt; + plugin_register_flush (cb_name, c_psql_flush, &ud); + } + else if (db->commit_interval > 0) { + log_warn ("Database '%s': You do not have any writers assigned to " + "this database connection. Setting 'CommitInterval' does " + "not have any effect.", db->database); + } return 0; } /* c_psql_config_database */ @@ -721,6 +1326,8 @@ static int c_psql_config (oconfig_item_t *ci) if (0 == strcasecmp (c->key, "Query")) udb_query_create (&queries, &queries_num, c, /* callback = */ config_query_callback); + else if (0 == strcasecmp (c->key, "Writer")) + c_psql_config_writer (c); else if (0 == strcasecmp (c->key, "Database")) c_psql_config_database (c); else diff --git a/src/utils_time.c b/src/utils_time.c index aac6135e..6789758d 100644 --- a/src/utils_time.c +++ b/src/utils_time.c @@ -61,4 +61,39 @@ cdtime_t cdtime (void) /* {{{ */ } /* }}} cdtime_t cdtime */ #endif +size_t cdtime_to_iso8601 (char *s, size_t max, cdtime_t t) /* {{{ */ +{ + struct timespec t_spec; + struct tm t_tm; + + size_t len; + + CDTIME_T_TO_TIMESPEC (t, &t_spec); + NORMALIZE_TIMESPEC (t_spec); + + if (localtime_r ((time_t *)&t_spec.tv_sec, &t_tm) == NULL) { + char errbuf[1024]; + ERROR ("cdtime_to_iso8601: localtime_r failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (0); + } + + len = strftime (s, max, "%Y-%m-%dT%H:%M:%S", &t_tm); + if (len == 0) + return 0; + + if (max - len > 2) { + int n = snprintf (s + len, max - len, ".%09i", (int)t_spec.tv_nsec); + len += (n < max - len) ? n : max - len; + } + + if (max - len > 3) { + int n = strftime (s + len, max - len, "%z", &t_tm); + len += (n < max - len) ? n : max - len; + } + + s[max - 1] = '\0'; + return len; +} /* }}} size_t cdtime_to_iso8601 */ + /* vim: set sw=2 sts=2 et fdm=marker : */ diff --git a/src/utils_time.h b/src/utils_time.h index 0fd809ac..0081957d 100644 --- a/src/utils_time.h +++ b/src/utils_time.h @@ -66,5 +66,11 @@ cdtime_t cdtime (void); +/* format a cdtime_t value in ISO 8601 format: + * returns the number of characters written to the string (not including the + * terminating null byte or 0 on error; the function ensures that the string + * is null terminated */ +size_t cdtime_to_iso8601 (char *s, size_t max, cdtime_t t); + #endif /* UTILS_TIME_H */ /* vim: set sw=2 sts=2 et : */