Merge branch 'sh/postgresql-writer'
authorFlorian Forster <octo@collectd.org>
Sat, 17 Nov 2012 08:39:33 +0000 (09:39 +0100)
committerFlorian Forster <octo@collectd.org>
Sat, 17 Nov 2012 08:39:33 +0000 (09:39 +0100)
Conflicts:
src/collectd.conf.pod
src/postgresql.c

1  2 
src/collectd.conf.pod
src/postgresql.c

diff --combined src/collectd.conf.pod
@@@ -25,32 -25,22 +25,32 @@@ controls which plugins to load. These p
  behavior.
  
  The syntax of this config file is similar to the config file of the famous
 -B<Apache Webserver>. Each line contains either a key-value-pair or a
 -section-start or -end. Empty lines and everything after the hash-symbol `#' is
 -ignored. Values are either string, enclosed in double-quotes,
 -(floating-point-)numbers or a boolean expression, i.E<nbsp>e. either B<true> or
 -B<false>. String containing of only alphanumeric characters and underscores do
 -not need to be quoted. Lines may be wrapped by using `\' as the last character
 -before the newline. This allows long lines to be split into multiple lines.
 -Quoted strings may be wrapped as well. However, those are treated special in
 -that whitespace at the beginning of the following lines will be ignored, which
 -allows for nicely indenting the wrapped lines.
 -
 -The configuration is read and processed in order, i.E<nbsp>e. from top to
 -bottom. So the plugins are loaded in the order listed in this config file. It
 -is a good idea to load any logging plugins first in order to catch messages
 -from plugins during configuration. Also, the C<LoadPlugin> option B<must> occur
 -B<before> the C<E<lt>Plugin ...E<gt>> block.
 +I<Apache> webserver. Each line contains either an option (a key and a list of
 +one or more values) or a section-start or -end. Empty lines and everything
 +after a non-quoted hash-symbol (C<#>) is ignored. I<Keys> are unquoted
 +strings, consisting only of alphanumeric characters and the underscore (C<_>)
 +character. Keys are handled case insensitive by I<collectd> itself and all
 +plugins included with it. I<Values> can either be an I<unquoted string>, a
 +I<quoted string> (enclosed in double-quotes) a I<number> or a I<boolean>
 +expression. I<Unquoted strings> consist of only alphanumeric characters and
 +underscores (C<_>) and do not need to be quoted. I<Quoted strings> are
 +enclosed in double quotes (C<">). You can use the backslash character (C<\>)
 +to include double quotes as part of the string. I<Numbers> can be specified in
 +decimal and floating point format (using a dot C<.> as decimal separator),
 +hexadecimal when using the C<0x> prefix and octal with a leading zero (C<0>).
 +I<Boolean> values are either B<true> or B<false>.
 +
 +Lines may be wrapped by using C<\> as the last character before the newline.
 +This allows long lines to be split into multiple lines. Quoted strings may be
 +wrapped as well. However, those are treated special in that whitespace at the
 +beginning of the following lines will be ignored, which allows for nicely
 +indenting the wrapped lines.
 +
 +The configuration is read and processed in order, i.e. from top to bottom. So
 +the plugins are loaded in the order listed in this config file. It is a good
 +idea to load any logging plugins first in order to catch messages from plugins
 +during configuration. Also, the C<LoadPlugin> option B<must> occur B<before>
 +the appropriate C<E<lt>Plugin ...E<gt>> block.
  
  =head1 GLOBAL OPTIONS
  
@@@ -96,12 -86,6 +96,12 @@@ By default, this is disabled. As a spec
  either C<perl> or C<python>, the default is changed to enabled in order to keep
  the average user from ever having to deal with this low level linking stuff.
  
 +=item B<Interval> I<Seconds>
 +
 +Sets a plugin-specific interval for collecting metrics. This overrides the
 +global B<Interval> setting. If a plugin provides own support for specifying an
 +interval, that setting will take precedence.
 +
  =back
  
  =item B<Include> I<Path>
@@@ -199,122 -183,12 +199,122 @@@ C<Plugin>-Section. Which options exist 
  require external configuration, too. The C<apache plugin>, for example,
  required C<mod_status> to be configured in the webserver you're going to
  collect data from. These plugins are listed below as well, even if they don't
 -require any configuration within collectd's configfile.
 +require any configuration within collectd's configuration file.
  
  A list of all plugins and a short summary for each plugin can be found in the
  F<README> file shipped with the sourcecode and hopefully binary packets as
  well.
  
 +=head2 Plugin C<aggregation>
 +
 +The I<Aggregation plugin> makes it possible to aggregate several values into
 +one using aggregation functions such as I<sum>, I<average>, I<min> and I<max>.
 +This can be put to a wide variety of uses, e.g. average and total CPU
 +statistics for your entire fleet.
 +
 +The grouping is powerful but, as with many powerful tools, may be a bit
 +difficult to wrap your head around. The grouping will therefore be
 +demonstrated using an example: The average and sum of the CPU usage across
 +all CPUs of each host is to be calculated.
 +
 +To select all the affected values for our example, set C<Plugin cpu> and
 +C<Type cpu>. The other values are left unspecified, meaning "all values". The
 +I<Host>, I<Plugin>, I<PluginInstance>, I<Type> and I<TypeInstance> options
 +work as if they were specified in the C<WHERE> clause of an C<SELECT> SQL
 +statement.
 +
 +  Plugin "cpu"
 +  Type "cpu"
 +
 +Although the I<Host>, I<PluginInstance> (CPU number, i.e. 0, 1, 2, ...)  and
 +I<TypeInstance> (idle, user, system, ...) fields are left unspecified in the
 +example, the intention is to have a new value for each host / type instance
 +pair. This is achieved by "grouping" the values using the C<GroupBy> option.
 +It can be specified multiple times to group by more than one field.
 +
 +  GroupBy "Host"
 +  GroupBy "TypeInstance"
 +
 +We do neither specify nor group by I<plugin instance> (the CPU number), so all
 +metrics that differ in the CPU number only will be aggregated. Each
 +aggregation needs I<at least one> such field, otherwise no aggregation would
 +take place.
 +
 +The full example configuration looks like this:
 +
 + <Plugin "aggregation">
 +   <Aggregation>
 +     Plugin "cpu"
 +     Type "cpu"
 +     
 +     GroupBy "Host"
 +     GroupBy "TypeInstance"
 +     
 +     CalculateSum true
 +     CalculateAverage true
 +   </Aggregation>
 + </Plugin>
 +
 +There are a couple of limitations you should be aware of:
 +
 +=over 4
 +
 +=item
 +
 +The I<Type> cannot be left unspecified, because it is not reasonable to add
 +apples to oranges. Also, the internal lookup structure won't work if you try
 +to group by type.
 +
 +=item
 +
 +There must be at least one unspecified, ungrouped field. Otherwise nothing
 +will be aggregated.
 +
 +=back
 +
 +As you can see in the example above, each aggregation has its own
 +B<Aggregation> block. You can have multiple aggregation blocks and aggregation
 +blocks may match the same values, i.e. one value list can update multiple
 +aggregations. The following options are valid inside B<Aggregation> blocks:
 +
 +=over 4
 +
 +=item B<Host> I<Host>
 +
 +=item B<Plugin> I<Plugin>
 +
 +=item B<PluginInstance> I<PluginInstance>
 +
 +=item B<Type> I<Type>
 +
 +=item B<TypeInstance> I<TypeInstance>
 +
 +Selects the value lists to be added to this aggregation. B<Type> must be a
 +valid data set name, see L<types.db(5)> for details.
 +
 +=item B<GroupBy> B<Host>|B<Plugin>|B<PluginInstance>|B<TypeInstance>
 +
 +Group valued by the specified field. The B<GroupBy> option may be repeated to
 +group by multiple fields.
 +
 +=item B<CalculateNum> B<true>|B<false>
 +
 +=item B<CalculateSum> B<true>|B<false>
 +
 +=item B<CalculateAverage> B<true>|B<false>
 +
 +=item B<CalculateMinimum> B<true>|B<false>
 +
 +=item B<CalculateMaximum> B<true>|B<false>
 +
 +=item B<CalculateStddev> B<true>|B<false>
 +
 +Boolean options for enabling calculation of the number of value lists, their
 +sum, average, minimum, maximum andE<nbsp>/ or standard deviation. All options
 +are disabled by default.
 +
 +=back
 +
  =head2 Plugin C<amqp>
  
  The I<AMQMP plugin> can be used to communicate with other instances of
@@@ -336,8 -210,6 +336,8 @@@ possibly filtering or messages
   #   Persistent false
   #   Format "command"
   #   StoreRates false
 + #   GraphitePrefix "collectd."
 + #   GraphiteEscapeChar "_"
     </Publish>
     
     # Receive values from an AMQP broker
@@@ -438,10 -310,6 +438,10 @@@ If set to B<JSON>, the values are encod
  an easy and straight forward exchange format. The C<Content-Type> header field
  will be set to C<application/json>.
  
 +If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
 +"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
 +C<text/graphite>.
 +
  A subscribing client I<should> use the C<Content-Type> header field to
  determine how to decode the values. Currently, the I<AMQP plugin> itself can
  only decode the B<Command> format.
@@@ -456,25 -324,6 +456,25 @@@ using the internal value cache
  Please note that currently this option is only used if the B<Format> option has
  been set to B<JSON>.
  
 +=item B<GraphitePrefix> (Publish and B<Format>=I<Graphite> only)
 +
 +A prefix can be added in the metric name when outputting in the I<Graphite> format.
 +It's added before the I<Host> name.
 +Metric name will be "<prefix><host><postfix><plugin><type><name>"
 +
 +=item B<GraphitePostfix> (Publish and B<Format>=I<Graphite> only)
 +
 +A postfix can be added in the metric name when outputting in the I<Graphite> format.
 +It's added after the I<Host> name.
 +Metric name will be "<prefix><host><postfix><plugin><type><name>"
 +
 +=item B<GraphiteEscapeChar> (Publish and B<Format>=I<Graphite> only)
 +
 +Specify a character to replace dots (.) in the host part of the metric name.
 +In I<Graphite> metric name, dots are used as separators between different
 +metric parts (host, plugin, type).
 +Default is "_" (I<Underscore>).
 +
  =back
  
  =head2 Plugin C<apache>
@@@ -2090,17 -1939,6 +2090,17 @@@ The C<memcached plugin> connects to a m
  about cache utilization, memory and bandwidth used.
  L<http://www.danga.com/memcached/>
  
 + <Plugin "memcached">
 +   <Instance "name">
 +     Host "memcache.example.com"
 +     Port 11211
 +   </Instance>
 + </Plugin>
 +
 +The plugin configuration consists of one or more B<Instance> blocks which
 +specify one I<memcached> connection each. Within the B<Instance> blocks, the
 +following options are allowed:
 +
  =over 4
  
  =item B<Host> I<Hostname>
@@@ -2111,11 -1949,6 +2111,11 @@@ Hostname to connect to. Defaults to B<1
  
  TCP-Port to connect to. Defaults to B<11211>.
  
 +=item B<Socket> I<Path>
 +
 +Connect to I<memcached> using the UNIX domain socket at I<Path>. If this
 +setting is given, the B<Host> and B<Port> settings are ignored.
 +
  =back
  
  =head2 Plugin C<modbus>
@@@ -3210,16 -3043,6 +3210,16 @@@ IP-address may be used in a filename i
  lookups. The default is to do reverse lookups to preserve backwards
  compatibility, though.
  
 +=item B<IncludeUnitID> B<true>|B<false>
 +
 +When a peer is a refclock, include the unit ID in the I<type instance>.
 +Defaults to B<false> for backward compatibility.
 +
 +If two refclock peers use the same driver and this is B<false>, the plugin will
 +try to write simultaneous measurements from both to the same type instance.
 +This will result in error messages in the log and only one set of measurements
 +making it through.
 +
  =back
  
  =head2 Plugin C<nut>
@@@ -3627,6 -3450,13 +3627,13 @@@ which are available in a PostgreSQL dat
  statistics provided by PostgreSQL without the need to upgrade your collectd
  installation.
  
+ Starting with version 5.2, the C<postgresql> plugin supports writing data to
+ PostgreSQL databases as well. This has been implemented in a generic way. You
+ need to specify an SQL statement which will then be executed by collectd in
+ order to write the data (see below for details). The benefit of that approach
+ is that there is no fixed database layout. Rather, the layout may be optimized
+ for the current setup.
  The B<PostgreSQL Documentation> manual can be found at
  L<http://www.postgresql.org/docs/manuals/>.
  
        </Result>
      </Query>
  
+     <Writer sqlstore>
+       Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+       StoreRates true
+     </Writer>
      <Database foo>
        Host "hostname"
        Port "5432"
        Query backend # predefined
        Query rt36_tickets
      </Database>
+     <Database qux>
+       # ...
+       Writer sqlstore
+       CommitInterval 10
+     </Database>
    </Plugin>
  
  The B<Query> block defines one database query which may later be used by a
@@@ -3720,11 -3561,6 +3738,11 @@@ used, the parameter expands to "localho
  
  The name of the database of the current connection.
  
 +=item I<instance>
 +
 +The name of the database plugin instance. See the B<Instance> option of the
 +database specification below for details.
 +
  =item I<username>
  
  The username used to connect to the database.
@@@ -3838,23 -3674,84 +3856,101 @@@ This query collects the on-disk size o
  
  =back
  
 +In addition, the following detailed queries are available by default. Please
 +note that each of those queries collects information B<by table>, thus,
 +potentially producing B<a lot> of data. For details see the description of the
 +non-by_table queries above.
 +
 +=over 4
 +
 +=item B<queries_by_table>
 +
 +=item B<query_plans_by_table>
 +
 +=item B<table_states_by_table>
 +
 +=item B<disk_io_by_table>
 +
 +=back
 +
+ The B<Writer> block defines a PostgreSQL writer backend. It accepts a single
+ mandatory argument specifying the name of the writer. This will then be used
+ in the B<Database> specification in order to activate the writer instance. The
+ names of all writers have to be unique. The following options may be
+ specified:
+ =over 4
+ =item B<Statement> I<sql statement>
+ This mandatory option specifies the SQL statement that will be executed for
+ each submitted value. A single SQL statement is allowed only. Anything after
+ the first semicolon will be ignored.
+ Nine parameters will be passed to the statement and should be specified as
+ tokens B<$1>, B<$2>, through B<$9> in the statement string. The following
+ values are made available through those parameters:
+ =over 4
+ =item B<$1>
+ The timestamp of the queried value as a floating point number.
+ =item B<$2>
+ The hostname of the queried value.
+ =item B<$3>
+ The plugin name of the queried value.
+ =item B<$4>
+ The plugin instance of the queried value. This value may be B<NULL> if there
+ is no plugin instance.
+ =item B<$5>
+ The type of the queried value (cf. L<types.db(5)>).
+ =item B<$6>
+ The type instance of the queried value. This value may be B<NULL> if there is
+ no type instance.
+ =item B<$7>
+ An array of names for the submitted values (i.E<nbsp>e., the name of the data
+ sources of the submitted value-list).
+ =item B<$8>
+ An array of types for the submitted values (i.E<nbsp>e., the type of the data
+ sources of the submitted value-list; C<counter>, C<gauge>, ...). Note, that if
+ B<StoreRates> is enabled (which is the default, see below), all types will be
+ C<gauge>.
+ =item B<$9>
+ An array of the submitted values. The dimensions of the value name and value
+ arrays match.
+ =back
+ In general, it is advisable to create and call a custom function in the
+ PostgreSQL database for this purpose. Any procedural language supported by
+ PostgreSQL will do (see chapter "Server Programming" in the PostgreSQL manual
+ for details).
+ =item B<StoreRates> B<false>|B<true>
+ If set to B<true> (the default), convert counter values to rates. If set to
+ B<false> counter values are stored as is, i.E<nbsp>e. as an increasing integer
+ number.
+ =back
  The B<Database> block defines one PostgreSQL database for which to collect
  statistics. It accepts a single mandatory argument which specifies the
  database name. None of the other options are required. PostgreSQL will use
@@@ -3870,6 -3767,17 +3966,17 @@@ for details
  Specify the interval with which the database should be queried. The default is
  to use the global B<Interval> setting.
  
+ =item B<CommitInterval> I<seconds>
+ This option may be used for database connections which have "writers" assigned
+ (see above). If specified, it causes a writer to put several updates into a
+ single transaction. This transaction will last for the specified amount of
+ time. By default, each update will be executed in a separate transaction. Each
+ transaction generates a fair amount of overhead which can, thus, be reduced by
+ activating this option. The draw-back is, that data covering the specified
+ amount of time will be lost, for example, if a single statement within the
+ transaction fails or if the database server crashes.
  =item B<Host> I<hostname>
  
  Specify the hostname or IP of the PostgreSQL server to connect to. If the
@@@ -3900,13 -3808,6 +4007,13 @@@ Specify the password to be used when co
  Specify whether to use an SSL connection when contacting the server. The
  following modes are supported:
  
 +=item B<Instance> I<name>
 +
 +Specify the plugin instance name that should be used instead of the database
 +name (which is the default, if this option has not been specified). This
 +allows to query multiple databases of the same name on the same host (e.g.
 +when running multiple database server versions in parallel).
 +
  =over 4
  
  =item I<disable>
@@@ -3942,11 -3843,36 +4049,36 @@@ B<PostgreSQL Documentation> for details
  
  =item B<Query> I<query>
  
- Specify a I<query> which should be executed for the database connection. This
- may be any of the predefined or user-defined queries. If no such option is
- given, it defaults to "backends", "transactions", "queries", "query_plans",
- "table_states", "disk_io" and "disk_usage". Else, the specified queries are
- used only.
+ Specifies a I<query> which should be executed in the context of the database
+ connection. This may be any of the predefined or user-defined queries. If no
+ such option is given, it defaults to "backends", "transactions", "queries",
+ "query_plans", "table_states", "disk_io" and "disk_usage" (unless a B<Writer>
+ has been specified). Else, the specified queries are used only.
+ =item B<Writer> I<writer>
+ Assigns the specified I<writer> backend to the database connection. This
+ causes all collected data to be send to the database using the settings
+ defined in the writer configuration (see the section "FILTER CONFIGURATION"
+ below for details on how to selectively send data to certain plugins).
+ Each writer will register a flush callback which may be used when having long
+ transactions enabled (see the B<CommitInterval> option above). When issuing
+ the B<FLUSH> command (see L<collectd-unixsock(5)> for details) the current
+ transaction will be committed right away. Two different kinds of flush
+ callbacks are available with the C<postgresql> plugin:
+ =over 4
+ =item B<postgresql>
+ Flush all writer backends.
+ =item B<postgresql->I<database>
+ Flush all writers of the specified I<database> only.
+ =back
  
  =back
  
@@@ -4267,10 -4193,6 +4399,10 @@@ The B<Port> option is the TCP port on w
  connections. Either a service name of a port number may be given. Please note
  that numerical port numbers must be given as a string, too.
  
 +=item B<Password> I<Password>
 +
 +Use I<Password> to authenticate when connecting to I<Redis>.
 +
  =item B<Timeout> I<Timeout in miliseconds>
  
  The B<Timeout> option set the socket timeout for node response. Since the Redis
@@@ -4328,50 -4250,6 +4460,50 @@@ Enables or disables the creation of RR
  locally, or B<DataDir> is set to a relative path, this will not work as
  expected. Default is B<true>.
  
 +=item B<StepSize> I<Seconds>
 +
 +B<Force> the stepsize of newly created RRD-files. Ideally (and per default)
 +this setting is unset and the stepsize is set to the interval in which the data
 +is collected. Do not use this option unless you absolutely have to for some
 +reason. Setting this option may cause problems with the C<snmp plugin>, the
 +C<exec plugin> or when the daemon is set up to receive data from other hosts.
 +
 +=item B<HeartBeat> I<Seconds>
 +
 +B<Force> the heartbeat of newly created RRD-files. This setting should be unset
 +in which case the heartbeat is set to twice the B<StepSize> which should equal
 +the interval in which data is collected. Do not set this option unless you have
 +a very good reason to do so.
 +
 +=item B<RRARows> I<NumRows>
 +
 +The C<rrdtool plugin> calculates the number of PDPs per CDP based on the
 +B<StepSize>, this setting and a timespan. This plugin creates RRD-files with
 +three times five RRAs, i. e. five RRAs with the CFs B<MIN>, B<AVERAGE>, and
 +B<MAX>. The five RRAs are optimized for graphs covering one hour, one day, one
 +week, one month, and one year.
 +
 +So for each timespan, it calculates how many PDPs need to be consolidated into
 +one CDP by calculating:
 +  number of PDPs = timespan / (stepsize * rrarows)
 +
 +Bottom line is, set this no smaller than the width of you graphs in pixels. The
 +default is 1200.
 +
 +=item B<RRATimespan> I<Seconds>
 +
 +Adds an RRA-timespan, given in seconds. Use this option multiple times to have
 +more then one RRA. If this option is never used, the built-in default of (3600,
 +86400, 604800, 2678400, 31622400) is used.
 +
 +For more information on how RRA-sizes are calculated see B<RRARows> above.
 +
 +=item B<XFF> I<Factor>
 +
 +Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
 +I<Factor> must be in the range C<[0.0-1.0)>, i.e. between zero (inclusive) and
 +one (exclusive).
 +
  =back
  
  =head2 Plugin C<rrdtool>
@@@ -4429,8 -4307,6 +4561,8 @@@ For more information on how RRA-sizes a
  =item B<XFF> I<Factor>
  
  Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
 +I<Factor> must be in the range C<[0.0-1.0)>, i.e. between zero (inclusive) and
 +one (exclusive).
  
  =item B<CacheFlush> I<Seconds>
  
@@@ -4546,11 -4422,6 +4678,11 @@@ and available space of each device wil
  This option is only available if the I<Swap plugin> can read C</proc/swaps>
  (under Linux) or use the L<swapctl(2)> mechanism (under I<Solaris>).
  
 +=item B<ReportBytes> B<false>|B<true>
 +
 +When enabled, the I<swap I/O> is reported in bytes. When disabled, the default,
 +I<swap I/O> is reported in pages. This option is available under Linux only.
 +
  =back
  
  =head2 Plugin C<syslog>
@@@ -5181,7 -5052,7 +5313,7 @@@ number
  If set to B<true>, the plugin instance and type instance will be in their own
  path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
  default), the plugin and plugin instance (and likewise the type and type
 -instance) are put into once component, for example C<host.cpu-0.cpu-idle>.
 +instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
  
  =item B<AlwaysAppendDS> B<false>|B<true>
  
diff --combined src/postgresql.c
@@@ -1,7 -1,7 +1,7 @@@
  /**
   * collectd - src/postgresql.c
-  * Copyright (C) 2008, 2009  Sebastian Harl
-  * Copyright (C) 2009        Florian Forster
+  * Copyright (C) 2008-2012  Sebastian Harl
+  * Copyright (C) 2009       Florian Forster
   * All rights reserved.
   *
   * Redistribution and use in source and binary forms, with or without
  #include "configfile.h"
  #include "plugin.h"
  
+ #include "utils_cache.h"
  #include "utils_db_query.h"
  #include "utils_complain.h"
  
+ #if HAVE_PTHREAD_H
+ # include <pthread.h>
+ #endif
  #include <pg_config_manual.h>
  #include <libpq-fe.h>
  
  #define log_err(...) ERROR ("postgresql: " __VA_ARGS__)
  #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__)
  #define log_info(...) INFO ("postgresql: " __VA_ARGS__)
+ #define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__)
  
  #ifndef C_PSQL_DEFAULT_CONF
  # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf"
@@@ -95,7 -101,6 +101,7 @@@ typedef enum 
        C_PSQL_PARAM_DB,
        C_PSQL_PARAM_USER,
        C_PSQL_PARAM_INTERVAL,
 +      C_PSQL_PARAM_INSTANCE,
  } c_psql_param_t;
  
  /* Parameter configuration. Stored as `user data' in the query objects. */
@@@ -105,6 -110,12 +111,12 @@@ typedef struct 
  } c_psql_user_data_t;
  
  typedef struct {
+       char *name;
+       char *statement;
+       _Bool store_rates;
+ } c_psql_writer_t;
+ typedef struct {
        PGconn      *conn;
        c_complain_t conn_complaint;
  
        udb_query_t    **queries;
        size_t           queries_num;
  
+       c_psql_writer_t **writers;
+       size_t            writers_num;
+       /* make sure we don't access the database object in parallel */
+       pthread_mutex_t   db_lock;
        cdtime_t interval;
  
+       /* writer "caching" settings */
+       cdtime_t commit_interval;
+       cdtime_t next_commit;
        char *host;
        char *port;
        char *database;
        char *user;
        char *password;
  
 +      char *instance;
 +
        char *sslmode;
  
        char *krbsrvname;
  
        char *service;
+       int ref_cnt;
  } c_psql_database_t;
  
  static char *def_queries[] = {
  };
  static int def_queries_num = STATIC_ARRAY_SIZE (def_queries);
  
+ static c_psql_database_t *databases     = NULL;
+ static size_t             databases_num = 0;
  static udb_query_t      **queries       = NULL;
  static size_t             queries_num   = 0;
  
+ static c_psql_writer_t   *writers       = NULL;
+ static size_t             writers_num   = 0;
+ static int c_psql_begin (c_psql_database_t *db)
+ {
+       PGresult *r = PQexec (db->conn, "BEGIN");
+       int status = 1;
+       if (r != NULL) {
+               if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+                       db->next_commit = cdtime() + db->commit_interval;
+                       status = 0;
+               }
+               else
+                       log_warn ("Failed to initiate ('BEGIN') transaction: %s",
+                                       PQerrorMessage (db->conn));
+               PQclear (r);
+       }
+       return status;
+ } /* c_psql_begin */
+ static int c_psql_commit (c_psql_database_t *db)
+ {
+       PGresult *r = PQexec (db->conn, "COMMIT");
+       int status = 1;
+       if (r != NULL) {
+               if (PGRES_COMMAND_OK == PQresultStatus (r)) {
+                       db->next_commit = 0;
+                       log_debug ("Successfully committed transaction.");
+                       status = 0;
+               }
+               else
+                       log_warn ("Failed to commit transaction: %s",
+                                       PQerrorMessage (db->conn));
+               PQclear (r);
+       }
+       return status;
+ } /* c_psql_commit */
  static c_psql_database_t *c_psql_database_new (const char *name)
  {
        c_psql_database_t *db;
  
-       db = (c_psql_database_t *)malloc (sizeof (*db));
+       db = (c_psql_database_t *)realloc (databases,
+                       (databases_num + 1) * sizeof (*db));
        if (NULL == db) {
                log_err ("Out of memory.");
                return NULL;
        }
  
+       databases = db;
+       db = databases + databases_num;
+       ++databases_num;
        db->conn = NULL;
  
        C_COMPLAIN_INIT (&db->conn_complaint);
        db->queries        = NULL;
        db->queries_num    = 0;
  
+       db->writers        = NULL;
+       db->writers_num    = 0;
+       pthread_mutex_init (&db->db_lock, /* attrs = */ NULL);
        db->interval   = 0;
  
+       db->commit_interval = 0;
+       db->next_commit     = 0;
        db->database   = sstrdup (name);
        db->host       = NULL;
        db->port       = NULL;
        db->user       = NULL;
        db->password   = NULL;
  
 +      db->instance   = sstrdup (name);
 +
        db->sslmode    = NULL;
  
        db->krbsrvname = NULL;
  
        db->service    = NULL;
+       db->ref_cnt    = 0;
        return db;
  } /* c_psql_database_new */
  
@@@ -196,6 -275,17 +280,17 @@@ static void c_psql_database_delete (voi
  
        c_psql_database_t *db = data;
  
+       --db->ref_cnt;
+       /* readers and writers may access this database */
+       if (db->ref_cnt > 0)
+               return;
+       /* wait for the lock to be released by the last writer */
+       pthread_mutex_lock (&db->db_lock);
+       if (db->next_commit > 0)
+               c_psql_commit (db);
        PQfinish (db->conn);
        db->conn = NULL;
  
        sfree (db->queries);
        db->queries_num = 0;
  
+       sfree (db->writers);
+       db->writers_num = 0;
+       pthread_mutex_unlock (&db->db_lock);
+       pthread_mutex_destroy (&db->db_lock);
        sfree (db->database);
        sfree (db->host);
        sfree (db->port);
        sfree (db->user);
        sfree (db->password);
  
 +      sfree (db->instance);
 +
        sfree (db->sslmode);
  
        sfree (db->krbsrvname);
  
        sfree (db->service);
+       /* don't care about freeing or reordering the 'databases' array
+        * this is done in 'shutdown' */
        return;
  } /* c_psql_database_delete */
  
@@@ -230,7 -328,7 +335,7 @@@ static int c_psql_connect (c_psql_datab
        int   buf_len = sizeof (conninfo);
        int   status;
  
-       if (! db)
+       if ((! db) || (! db->database))
                return -1;
  
        status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database);
@@@ -278,9 -376,8 +383,9 @@@ static int c_psql_check_connection (c_p
  
                if (CONNECTION_OK != PQstatus (db->conn)) {
                        c_complain (LOG_ERR, &db->conn_complaint,
 -                                      "Failed to connect to database %s: %s",
 -                                      db->database, PQerrorMessage (db->conn));
 +                                      "Failed to connect to database %s (%s): %s",
 +                                      db->database, db->instance,
 +                                      PQerrorMessage (db->conn));
                        return -1;
                }
  
@@@ -345,13 -442,9 +450,13 @@@ static PGresult *c_psql_exec_query_para
                        case C_PSQL_PARAM_INTERVAL:
                                ssnprintf (interval, sizeof (interval), "%.3f",
                                                (db->interval > 0)
 -                                              ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g);
 +                                              ? CDTIME_T_TO_DOUBLE (db->interval)
 +                                              : plugin_get_interval ());
                                params[i] = interval;
                                break;
 +                      case C_PSQL_PARAM_INSTANCE:
 +                              params[i] = db->instance;
 +                              break;
                        default:
                                assert (0);
                }
                        NULL, NULL, /* return text data */ 0);
  } /* c_psql_exec_query_params */
  
+ /* db->db_lock must be locked when calling this function */
  static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
                udb_query_preparation_area_t *prep_area)
  {
        else if ((NULL == data) || (0 == data->params_num))
                res = c_psql_exec_query_noparams (db, q);
        else {
 -              log_err ("Connection to database \"%s\" does not support parameters "
 -                              "(protocol version %d) - cannot execute query \"%s\".",
 -                              db->database, db->proto_version,
 +              log_err ("Connection to database \"%s\" (%s) does not support "
 +                              "parameters (protocol version %d) - "
 +                              "cannot execute query \"%s\".",
 +                              db->database, db->instance, db->proto_version,
                                udb_query_get_name (q));
                return -1;
        }
  
+       /* give c_psql_write() a chance to acquire the lock if called recursively
+        * through dispatch_values(); this will happen if, both, queries and
+        * writers are configured for a single connection */
+       pthread_mutex_unlock (&db->db_lock);
        column_names = NULL;
        column_values = NULL;
  
- #define BAIL_OUT(status) \
-       sfree (column_names); \
-       sfree (column_values); \
-       PQclear (res); \
-       return status
        if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+               pthread_mutex_lock (&db->db_lock);
                log_err ("Failed to execute SQL query: %s",
                                PQerrorMessage (db->conn));
                log_info ("SQL query was: %s",
                                udb_query_get_statement (q));
-               BAIL_OUT (-1);
+               PQclear (res);
+               return -1;
        }
  
+ #define BAIL_OUT(status) \
+       sfree (column_names); \
+       sfree (column_values); \
+       PQclear (res); \
+       pthread_mutex_lock (&db->db_lock); \
+       return status
        rows_num = PQntuples (res);
        if (1 > rows_num) {
                BAIL_OUT (0);
                host = db->host;
  
        status = udb_query_prepare_result (q, prep_area, host, "postgresql",
 -                      db->database, column_names, (size_t) column_num, db->interval);
 +                      db->instance, column_names, (size_t) column_num, db->interval);
        if (0 != status) {
                log_err ("udb_query_prepare_result failed with status %i.",
                                status);
@@@ -500,10 -602,14 +615,15 @@@ static int c_psql_read (user_data_t *ud
        db = ud->data;
  
        assert (NULL != db->database);
 +      assert (NULL != db->instance);
+       assert (NULL != db->queries);
+       pthread_mutex_lock (&db->db_lock);
  
-       if (0 != c_psql_check_connection (db))
+       if (0 != c_psql_check_connection (db)) {
+               pthread_mutex_unlock (&db->db_lock);
                return -1;
+       }
  
        for (i = 0; i < db->queries_num; ++i)
        {
                        success = 1;
        }
  
+       pthread_mutex_unlock (&db->db_lock);
        if (! success)
                return -1;
        return 0;
  } /* c_psql_read */
  
+ static char *values_name_to_sqlarray (const data_set_t *ds,
+               char *string, size_t string_len)
+ {
+       char  *str_ptr;
+       size_t str_len;
+       int i;
+       str_ptr = string;
+       str_len = string_len;
+       for (i = 0; i < ds->ds_num; ++i) {
+               int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name);
+               if (status < 1)
+                       return NULL;
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value names");
+               return NULL;
+       }
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+       return string;
+ } /* values_name_to_sqlarray */
+ static char *values_type_to_sqlarray (const data_set_t *ds,
+               char *string, size_t string_len, _Bool store_rates)
+ {
+       char  *str_ptr;
+       size_t str_len;
+       int i;
+       str_ptr = string;
+       str_len = string_len;
+       for (i = 0; i < ds->ds_num; ++i) {
+               int status;
+               if (store_rates)
+                       status = ssnprintf(str_ptr, str_len, ",'gauge'");
+               else
+                       status = ssnprintf(str_ptr, str_len, ",'%s'",
+                                       DS_TYPE_TO_STRING (ds->ds[i].type));
+               if (status < 1) {
+                       str_len = 0;
+                       break;
+               }
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value types");
+               return NULL;
+       }
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+       return string;
+ } /* values_type_to_sqlarray */
+ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl,
+               char *string, size_t string_len, _Bool store_rates)
+ {
+       char  *str_ptr;
+       size_t str_len;
+       gauge_t *rates = NULL;
+       int i;
+       str_ptr = string;
+       str_len = string_len;
+       for (i = 0; i < vl->values_len; ++i) {
+               int status = 0;
+               if ((ds->ds[i].type != DS_TYPE_GAUGE)
+                               && (ds->ds[i].type != DS_TYPE_COUNTER)
+                               && (ds->ds[i].type != DS_TYPE_DERIVE)
+                               && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
+                       log_err ("c_psql_write: Unknown data source type: %i",
+                                       ds->ds[i].type);
+                       sfree (rates);
+                       return NULL;
+               }
+               if (ds->ds[i].type == DS_TYPE_GAUGE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%f", vl->values[i].gauge);
+               else if (store_rates) {
+                       if (rates == NULL)
+                               rates = uc_get_rate (ds, vl);
+                       if (rates == NULL) {
+                               log_err ("c_psql_write: Failed to determine rate");
+                               return NULL;
+                       }
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%lf", rates[i]);
+               }
+               else if (ds->ds[i].type == DS_TYPE_COUNTER)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%llu", vl->values[i].counter);
+               else if (ds->ds[i].type == DS_TYPE_DERIVE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%"PRIi64, vl->values[i].derive);
+               else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%"PRIu64, vl->values[i].absolute);
+               if (status < 1) {
+                       str_len = 0;
+                       break;
+               }
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+       sfree (rates);
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value list");
+               return NULL;
+       }
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+       return string;
+ } /* values_to_sqlarray */
+ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
+               user_data_t *ud)
+ {
+       c_psql_database_t *db;
+       char time_str[32];
+       char values_name_str[1024];
+       char values_type_str[1024];
+       char values_str[1024];
+       const char *params[9];
+       int success = 0;
+       int i;
+       if ((ud == NULL) || (ud->data == NULL)) {
+               log_err ("c_psql_write: Invalid user data.");
+               return -1;
+       }
+       db = ud->data;
+       assert (db->database != NULL);
+       assert (db->writers != NULL);
+       if (cdtime_to_iso8601 (time_str, sizeof (time_str), vl->time) == 0) {
+               log_err ("c_psql_write: Failed to convert time to ISO 8601 format");
+               return -1;
+       }
+       if (values_name_to_sqlarray (ds,
+                               values_name_str, sizeof (values_name_str)) == NULL)
+               return -1;
+ #define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v))
+       params[0] = time_str;
+       params[1] = vl->host;
+       params[2] = vl->plugin;
+       params[3] = VALUE_OR_NULL(vl->plugin_instance);
+       params[4] = vl->type;
+       params[5] = VALUE_OR_NULL(vl->type_instance);
+       params[6] = values_name_str;
+ #undef VALUE_OR_NULL
+       pthread_mutex_lock (&db->db_lock);
+       if (0 != c_psql_check_connection (db)) {
+               pthread_mutex_unlock (&db->db_lock);
+               return -1;
+       }
+       if ((db->commit_interval > 0)
+                       && (db->next_commit == 0))
+               c_psql_begin (db);
+       for (i = 0; i < db->writers_num; ++i) {
+               c_psql_writer_t *writer;
+               PGresult *res;
+               writer = db->writers[i];
+               if (values_type_to_sqlarray (ds,
+                                       values_type_str, sizeof (values_type_str),
+                                       writer->store_rates) == NULL) {
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+               if (values_to_sqlarray (ds, vl,
+                                       values_str, sizeof (values_str),
+                                       writer->store_rates) == NULL) {
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+               params[7] = values_type_str;
+               params[8] = values_str;
+               res = PQexecParams (db->conn, writer->statement,
+                               STATIC_ARRAY_SIZE (params), NULL,
+                               (const char *const *)params,
+                               NULL, NULL, /* return text data */ 0);
+               if ((PGRES_COMMAND_OK != PQresultStatus (res))
+                               && (PGRES_TUPLES_OK != PQresultStatus (res))) {
+                       if ((CONNECTION_OK != PQstatus (db->conn))
+                                       && (0 == c_psql_check_connection (db))) {
+                               PQclear (res);
+                               /* try again */
+                               res = PQexecParams (db->conn, writer->statement,
+                                               STATIC_ARRAY_SIZE (params), NULL,
+                                               (const char *const *)params,
+                                               NULL, NULL, /* return text data */ 0);
+                               if ((PGRES_COMMAND_OK == PQresultStatus (res))
+                                               || (PGRES_TUPLES_OK == PQresultStatus (res))) {
+                                       success = 1;
+                                       continue;
+                               }
+                       }
+                       log_err ("Failed to execute SQL query: %s",
+                                       PQerrorMessage (db->conn));
+                       log_info ("SQL query was: '%s', "
+                                       "params: %s, %s, %s, %s, %s, %s, %s, %s",
+                                       writer->statement,
+                                       params[0], params[1], params[2], params[3],
+                                       params[4], params[5], params[6], params[7]);
+                       /* this will abort any current transaction -> restart */
+                       if (db->next_commit > 0)
+                               c_psql_commit (db);
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+               success = 1;
+       }
+       if ((db->next_commit > 0)
+                       && (cdtime () > db->next_commit))
+               c_psql_commit (db);
+       pthread_mutex_unlock (&db->db_lock);
+       if (! success)
+               return -1;
+       return 0;
+ } /* c_psql_write */
+ /* We cannot flush single identifiers as all we do is to commit the currently
+  * running transaction, thus making sure that all written data is actually
+  * visible to everybody. */
+ static int c_psql_flush (cdtime_t timeout,
+               __attribute__((unused)) const char *ident,
+               user_data_t *ud)
+ {
+       c_psql_database_t *dbs = databases;
+       size_t dbs_num = databases_num;
+       size_t i;
+       if ((ud != NULL) && (ud->data != NULL)) {
+               dbs = ud->data;
+               dbs_num = 1;
+       }
+       for (i = 0; i < dbs_num; ++i) {
+               c_psql_database_t *db = dbs + i;
+               /* don't commit if the timeout is larger than the regular commit
+                * interval as in that case all requested data has already been
+                * committed */
+               if ((db->next_commit > 0) && (db->commit_interval > timeout))
+                       c_psql_commit (db);
+       }
+       return 0;
+ } /* c_psql_flush */
  static int c_psql_shutdown (void)
  {
+       size_t i = 0;
+       _Bool had_flush = 0;
        plugin_unregister_read_group ("postgresql");
  
+       for (i = 0; i < databases_num; ++i) {
+               c_psql_database_t *db = databases + i;
+               if (db->writers_num > 0) {
+                       char cb_name[DATA_MAX_NAME_LEN];
+                       ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s",
+                                       db->database);
+                       if (! had_flush) {
+                               plugin_unregister_flush ("postgresql");
+                               had_flush = 1;
+                       }
+                       plugin_unregister_flush (cb_name);
+                       plugin_unregister_write (cb_name);
+               }
+       }
        udb_query_free (queries, queries_num);
        queries = NULL;
        queries_num = 0;
  
+       sfree (writers);
+       writers = NULL;
+       writers_num = 0;
+       sfree (databases);
+       databases = NULL;
+       databases_num = 0;
        return 0;
  } /* c_psql_shutdown */
  
@@@ -572,8 -1036,6 +1050,8 @@@ static int config_query_param_add (udb_
                data->params[data->params_num] = C_PSQL_PARAM_USER;
        else if (0 == strcasecmp (param_str, "interval"))
                data->params[data->params_num] = C_PSQL_PARAM_INTERVAL;
 +      else if (0 == strcasecmp (param_str, "instance"))
 +              data->params[data->params_num] = C_PSQL_PARAM_INSTANCE;
        else {
                log_err ("Invalid parameter \"%s\".", param_str);
                return 1;
@@@ -595,6 -1057,103 +1073,103 @@@ static int config_query_callback (udb_q
        return (-1);
  } /* config_query_callback */
  
+ static int config_add_writer (oconfig_item_t *ci,
+               c_psql_writer_t *src_writers, size_t src_writers_num,
+               c_psql_writer_t ***dst_writers, size_t *dst_writers_num)
+ {
+       char *name;
+       size_t i;
+       if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL))
+               return -1;
+       if ((ci->values_num != 1)
+                       || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+               log_err ("`Writer' expects a single string argument.");
+               return 1;
+       }
+       name = ci->values[0].value.string;
+       for (i = 0; i < src_writers_num; ++i) {
+               c_psql_writer_t **tmp;
+               if (strcasecmp (name, src_writers[i].name) != 0)
+                       continue;
+               tmp = (c_psql_writer_t **)realloc (*dst_writers,
+                               sizeof (**dst_writers) * (*dst_writers_num + 1));
+               if (tmp == NULL) {
+                       log_err ("Out of memory.");
+                       return -1;
+               }
+               tmp[*dst_writers_num] = src_writers + i;
+               *dst_writers = tmp;
+               ++(*dst_writers_num);
+               break;
+       }
+       if (i >= src_writers_num) {
+               log_err ("No such writer: `%s'", name);
+               return -1;
+       }
+       return 0;
+ } /* config_add_writer */
+ static int c_psql_config_writer (oconfig_item_t *ci)
+ {
+       c_psql_writer_t *writer;
+       c_psql_writer_t *tmp;
+       int status = 0;
+       int i;
+       if ((ci->values_num != 1)
+                       || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+               log_err ("<Writer> expects a single string argument.");
+               return 1;
+       }
+       tmp = (c_psql_writer_t *)realloc (writers,
+                       sizeof (*writers) * (writers_num + 1));
+       if (tmp == NULL) {
+               log_err ("Out of memory.");
+               return -1;
+       }
+       writers = tmp;
+       writer  = writers + writers_num;
+       ++writers_num;
+       writer->name = sstrdup (ci->values[0].value.string);
+       writer->statement = NULL;
+       writer->store_rates = 1;
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *c = ci->children + i;
+               if (strcasecmp ("Statement", c->key) == 0)
+                       status = cf_util_get_string (c, &writer->statement);
+               else if (strcasecmp ("StoreRates", c->key) == 0)
+                       status = cf_util_get_boolean (c, &writer->store_rates);
+               else
+                       log_warn ("Ignoring unknown config key \"%s\".", c->key);
+       }
+       if (status != 0) {
+               sfree (writer->statement);
+               sfree (writer->name);
+               sfree (writer);
+               return status;
+       }
+       return 0;
+ } /* c_psql_config_writer */
  static int c_psql_config_database (oconfig_item_t *ci)
  {
        c_psql_database_t *db;
        struct timespec cb_interval = { 0, 0 };
        user_data_t ud;
  
+       static _Bool have_flush = 0;
        int i;
  
        if ((1 != ci->values_num)
                        cf_util_get_string (c, &db->user);
                else if (0 == strcasecmp (c->key, "Password"))
                        cf_util_get_string (c, &db->password);
 +              else if (0 == strcasecmp (c->key, "Instance"))
 +                      cf_util_get_string (c, &db->instance);
                else if (0 == strcasecmp (c->key, "SSLMode"))
                        cf_util_get_string (c, &db->sslmode);
                else if (0 == strcasecmp (c->key, "KRBSrvName"))
                else if (0 == strcasecmp (c->key, "Query"))
                        udb_query_pick_from_list (c, queries, queries_num,
                                        &db->queries, &db->queries_num);
+               else if (0 == strcasecmp (c->key, "Writer"))
+                       config_add_writer (c, writers, writers_num,
+                                       &db->writers, &db->writers_num);
                else if (0 == strcasecmp (c->key, "Interval"))
                        cf_util_get_cdtime (c, &db->interval);
+               else if (strcasecmp ("CommitInterval", c->key) == 0)
+                       cf_util_get_cdtime (c, &db->commit_interval);
                else
                        log_warn ("Ignoring unknown config key \"%s\".", c->key);
        }
  
        /* If no `Query' options were given, add the default queries.. */
-       if (db->queries_num == 0) {
+       if ((db->queries_num == 0) && (db->writers_num == 0)){
                for (i = 0; i < def_queries_num; i++)
                        udb_query_pick_from_list_by_name (def_queries[i],
                                        queries, queries_num,
        ud.data = db;
        ud.free_func = c_psql_database_delete;
  
 -      ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database);
 +      ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance);
  
-       CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
+       if (db->queries_num > 0) {
+               CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
  
-       plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
-                       /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
-                       &ud);
+               ++db->ref_cnt;
+               plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
+                               /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
+                               &ud);
+       }
+       if (db->writers_num > 0) {
+               ++db->ref_cnt;
+               plugin_register_write (cb_name, c_psql_write, &ud);
+               if (! have_flush) {
+                       /* flush all */
+                       plugin_register_flush ("postgresql",
+                                       c_psql_flush, /* user data = */ NULL);
+                       have_flush = 1;
+               }
+               /* flush this connection only */
+               ++db->ref_cnt;
+               plugin_register_flush (cb_name, c_psql_flush, &ud);
+       }
+       else if (db->commit_interval > 0) {
+               log_warn ("Database '%s': You do not have any writers assigned to "
+                               "this database connection. Setting 'CommitInterval' does "
+                               "not have any effect.", db->database);
+       }
        return 0;
  } /* c_psql_config_database */
  
@@@ -721,6 -1308,8 +1326,8 @@@ static int c_psql_config (oconfig_item_
                if (0 == strcasecmp (c->key, "Query"))
                        udb_query_create (&queries, &queries_num, c,
                                        /* callback = */ config_query_callback);
+               else if (0 == strcasecmp (c->key, "Writer"))
+                       c_psql_config_writer (c);
                else if (0 == strcasecmp (c->key, "Database"))
                        c_psql_config_database (c);
                else