postgresql plugin: Added 'CommitInterval' config option.
authorSebastian Harl <sh@tokkee.org>
Thu, 25 Oct 2012 09:00:09 +0000 (11:00 +0200)
committerSebastian Harl <sh@tokkee.org>
Wed, 14 Nov 2012 18:45:13 +0000 (19:45 +0100)
If specified, this option causes a writer to put several updates into a single
transaction. This transaction will last for the specified amount of time (in
seconds). By default, each update would be executed in a separate transaction
causing quite some overhead.

src/postgresql.c

index 594fa32..9b264d9 100644 (file)
@@ -56,6 +56,7 @@
 #define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
 #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
 #define log_info(...) INFO ("postgresql: " __VA_ARGS__)
+#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__)
 
 #ifndef C_PSQL_DEFAULT_CONF
 # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
@@ -136,6 +137,10 @@ typedef struct {
 
        cdtime_t interval;
 
+       /* writer "caching" settings */
+       cdtime_t commit_interval;
+       cdtime_t next_commit;
+
        char *host;
        char *port;
        char *database;
@@ -196,6 +201,9 @@ static c_psql_database_t *c_psql_database_new (const char *name)
 
        db->interval   = 0;
 
+       db->commit_interval = 0;
+       db->next_commit     = 0;
+
        db->database   = sstrdup (name);
        db->host       = NULL;
        db->port       = NULL;
@@ -230,6 +238,10 @@ static void c_psql_database_delete (void *data)
        sfree (db->writers);
        db->writers_num = 0;
 
+       /* wait for the lock to be released by the last writer */
+       pthread_mutex_lock (&db->db_lock);
+       pthread_mutex_unlock (&db->db_lock);
+
        pthread_mutex_destroy (&db->db_lock);
 
        sfree (db->database);
@@ -725,6 +737,45 @@ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl,
        return string;
 } /* values_to_sqlarray */
 
+static int c_psql_begin (c_psql_database_t *db)
+{
+       PGresult *r = PQexec (db->conn, "BEGIN");
+
+       int status = 1;
+
+       if (r != NULL) {
+               if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+                       db->next_commit = cdtime() + db->commit_interval;
+                       status = 0;
+               }
+               else
+                       log_warn ("Failed to initiate ('BEGIN') transaction: %s",
+                                       PQerrorMessage (db->conn));
+               PQclear (r);
+       }
+       return status;
+} /* c_psql_begin */
+
+static int c_psql_commit (c_psql_database_t *db)
+{
+       PGresult *r = PQexec (db->conn, "COMMIT");
+
+       int status = 1;
+
+       if (r != NULL) {
+               if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+                       db->next_commit = cdtime () + db->commit_interval;
+                       log_debug ("Successfully committed transaction.");
+                       status = 0;
+               }
+               else
+                       log_warn ("Failed to commit transaction: %s",
+                                       PQerrorMessage (db->conn));
+               PQclear (r);
+       }
+       return status;
+} /* c_psql_commit */
+
 static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
                user_data_t *ud)
 {
@@ -777,6 +828,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
                return -1;
        }
 
+       if ((db->commit_interval > 0)
+                       && (db->next_commit == 0))
+               c_psql_begin (db);
+
        for (i = 0; i < db->writers_num; ++i) {
                c_psql_writer_t *writer;
                PGresult *res;
@@ -785,13 +840,17 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
 
                if (values_type_to_sqlarray (ds,
                                        values_type_str, sizeof (values_type_str),
-                                       writer->store_rates) == NULL)
+                                       writer->store_rates) == NULL) {
+                       pthread_mutex_unlock (&db->db_lock);
                        return -1;
+               }
 
                if (values_to_sqlarray (ds, vl,
                                        values_str, sizeof (values_str),
-                                       writer->store_rates) == NULL)
+                                       writer->store_rates) == NULL) {
+                       pthread_mutex_unlock (&db->db_lock);
                        return -1;
+               }
 
                params[7] = values_type_str;
                params[8] = values_str;
@@ -833,6 +892,11 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
                success = 1;
        }
 
+       if ((db->next_commit > 0)
+                       && (cdtime () > db->next_commit))
+               if (c_psql_commit (db) == 0)
+                       c_psql_begin (db);
+
        pthread_mutex_unlock (&db->db_lock);
 
        if (! success)
@@ -1055,6 +1119,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
                                        &db->writers, &db->writers_num);
                else if (0 == strcasecmp (c->key, "Interval"))
                        cf_util_get_cdtime (c, &db->interval);
+               else if (strcasecmp ("CommitInterval", c->key) == 0)
+                       cf_util_get_cdtime (c, &db->commit_interval);
                else
                        log_warn ("Ignoring unknown config key \"%s\".", c->key);
        }
@@ -1109,6 +1175,11 @@ static int c_psql_config_database (oconfig_item_t *ci)
        if (db->writers_num > 0) {
                plugin_register_write (cb_name, c_psql_write, &ud);
        }
+       else if (db->commit_interval > 0) {
+               log_warn ("Database '%s': You do not have any writers assigned to "
+                               "this database connection. Setting 'CommitInterval' does "
+                               "not have any effect.", db->database);
+       }
        return 0;
 } /* c_psql_config_database */