=item B<PIDFile> I<File>
Sets where to write the PID file to. This file is overwritten when it exists
-and deleted when the program ist stopped. Some init-scripts might override this
-setting using the B<-P> commandline option.
+and deleted when the program is stopped. Some init-scripts might override this
+setting using the B<-P> command-line option.
=item B<PluginDir> I<Directory>
=head1 PLUGIN OPTIONS
-Some Plugins may register own options. These options must be enclosed in a
+Some plugins may register own options. These options must be enclosed in a
C<Plugin>-Section. Which options exist depends on the plugin used. Some plugins
require external configuration, too. The C<apache plugin>, for example,
required C<mod_status> to be configured in the webserver you're going to
=item B<DataDir> I<Directory>
- Set the directory to store RRD-files under. Per default RRD-files are generated
+ Set the directory to store CSV-files under. Per default CSV-files are generated
beneath the daemon's working directory, i.E<nbsp>e. the B<BaseDir>.
=back
=over 4
+=item B<SocketFile> I<Path>
+
+Sets the socket-file which is to be created.
+
=item B<SocketGroup> I<Group>
If running as root change the group of the UNIX-socket after it has been
=over 4
-=item B<Exec> I<User> I<Executable>
+=item B<Exec> I<User>[:[I<Group>]] I<Executable>
+
+Execute the executable I<Executable> as user I<User>. If the user name is
+followed by a colon and a group name, the effective group is set to that group.
+The real group and saved-set group will be set to the default group of that
+user. If no group is given the effective group ID will be the same as the real
+group ID.
-Execute the executable I<Executable> as user I<User>.
+Please note that in order to change the user and/or group the daemon needs
+superuser privileges. If the daemon is run as an unprivileged user you must
+specify the same user/group here. If the daemon is run with superuser
+privileges, you must supply a non-root user here.
=back
The C<mysql plugin> requires B<mysqlclient> to be installed. It connects to the
database when started and keeps the connection up as long as possible. When the
connection is interrupted for whatever reason it will try to re-connect. The
- plugin logs loud complaints in case anything goes wrong.
+ plugin will complaint loudly in case anything goes wrong.
This plugin issues C<SHOW STATUS> and evaluates C<Bytes_{received,sent}>,
C<Com_*> and C<Handler_*> which correspond to F<mysql_octets.rrd>,
=back
+=head2 Plugin C<netlink>
+
+The C<netlink> plugin uses a netlink socket to query the Linux kernel about
+statistics of various interface and routing aspects.
+
+=over 4
+
+=item B<Interface> I<Interface>
+
+=item B<VerboseInterface> I<Interface>
+
+Instruct the plugin to collect interface statistics. This is basically the same
+as the statistics provided by the C<interface> plugin (see above) but
+potentially much more detailed.
+
+When configuring with B<Interface> only the basic statistics will be collected,
+namely octets, packets, and errors. These statistics are collected by
+the C<interface> plugin, too, so using both at the same time is no benefit.
+
+When configured with B<VerboseInterface> all counters B<except> the basic ones,
+so that no data needs to be collected twice if you use the C<interface> plugin.
+This includes dropped packets, received multicast packets, collisions and a
+whole zoo of differentiated RX and TX errors. You can try the following command
+to get an idea of what awaits you:
+
+ ip -s -s link list
+
+If I<Interface> is B<All>, all interfaces will be selected.
+
+=item B<QDisc> I<Interface> [I<QDisc>]
+
+=item B<Class> I<Interface> [I<Class>]
+
+=item B<Filter> I<Interface> [I<Filter>]
+
+Collect the octets and packets that pass a certain qdisc, class or filter.
+
+QDiscs and classes are identified by their type and handle (or classid).
+Filters don't necessarily have a handle, therefore the parent's handle is used.
+The notation used in collectd differs from that used in tc(1) in that it
+doesn't skip the major or minor number if it's zero and doesn't print special
+ids by their name. So, for example, a qdisc may be identified by
+C<pfifo_fast-1:0> even though the minor number of B<all> qdiscs is zero and
+thus not displayed by tc(1).
+
+If B<QDisc>, B<Class>, or B<Filter> is given without the second argument,
+i.E<nbsp>.e. without an identifier, all qdiscs, classes, or filters that are
+associated with that interface will be collected.
+
+Since a filter itself doesn't necessarily have a handle, the parent's handle is
+used. This may lead to problems when more than one filter is attached to a
+qdisc or class. This isn't nice, but we don't know how this could be done any
+better. If you have a idea, please don't hesitate to tell us.
+
+As with the B<Interface> option you can specify B<All> as the interface,
+meaning all interfaces.
+
+Here are some examples to help you understand the above text more easily:
+
+ <Plugin netlink>
+ VerboseInterface "All"
+ QDisc "eth0" "pfifo_fast-1:0"
+ QDisc "ppp0"
+ Class "ppp0" "htb-1:10"
+ Filter "ppp0" "u32-1:0"
+ </Plugin>
+
+=item B<IgnoreSelected>
+
+The behaviour is the same as with all other similar plugins: If nothing is
+selected at all, everything is collected. If some things are selected using the
+options described above, only these statistics are collected. If you set
+B<IgnoreSelected> to B<true>, this behavior is inversed, i.E<nbsp>e. the
+specified statistics will not be collected.
+
+=back
+
=head2 Plugin C<network>
=over 4
=head2 Plugin C<rrdtool>
You can use the settings B<StepSize>, B<HeartBeat>, B<RRARows>, and B<XFF> to
-finetune your RRD-files. Please read L<rrdcreate(1)> if you encounter problems
+fine-tune your RRD-files. Please read L<rrdcreate(1)> if you encounter problems
using these settings. If you don't want to dive into the depths of RRDTool, you
can safely ignore these settings.
=item B<StepSize> I<Seconds>
-Sets the stepsize of newly created RRD-files. Ideally (and per default) this
-setting is identical to the global B<Interval>-option and should not be
-smaller. If unsure, don't set this option.
+B<Force> the stepsize of newly created RRD-files. Ideally (and per default)
+this setting is unset and the stepsize is set to the interval in which the data
+is collected. Do not use this option unless you absolutely have to for some
+reason. Setting this option may cause problems with the C<snmp plugin>, the
+C<exec plugin> or when the daemon is set up to receive data from other hosts.
=item B<HeartBeat> I<Seconds>
-Sets the heartbeat of newly created RRD-files. Ideally this setting is bigger
-than the B<Interval>-setting, by default it's twice the B<Interval>-setting. If
-unsure, don't set this option.
+B<Force> the heartbeat of newly created RRD-files. This setting should be unset
+in which case the heartbeat is set to twice the B<StepSize> which should equal
+the interval in which data is collected. Do not set this option unless you have
+a very good reason to do so.
=item B<RRARows> I<NumRows>
If this option is set to a value greater than zero, the C<rrdtool plugin> will
save values in a cache, as described above. Writing multiple values at once
reduces IO-operations and thus lessens the load produced by updating the files.
-The tradeoff is that the graphs kind of "drag behind" and that more memory is
+The trade off is that the graphs kind of "drag behind" and that more memory is
used.
=back
=back
+=head2 Plugin C<snmp>
+
+Since the configuration of the C<snmp plugin> is a little more complicated than
+other plugins, its documentation has been moved to an own manpage,
+L<collectd-snmp(5)>. Please see there for details.
+
=head2 Plugin C<syslog>
=over 4
char **values;
time_t first_value;
time_t last_value;
+ enum
+ {
+ FLAG_NONE = 0x00,
+ FLAG_QUEUED = 0x01
+ } flags;
};
typedef struct rrd_cache_s rrd_cache_t;
+struct rrd_queue_s
+{
+ char *filename;
+ struct rrd_queue_s *next;
+};
+typedef struct rrd_queue_s rrd_queue_t;
+
/*
* Private variables
*/
};
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+/* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
+ * is zero a default, depending on the `interval' member of the value list is
+ * being used. */
static char *datadir = NULL;
static int stepsize = 0;
static int heartbeat = 0;
static int rrarows = 1200;
static double xff = 0.1;
+/* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
+ * ALWAYS lock `cache_lock' first! */
static int cache_timeout = 0;
static int cache_flush_timeout = 0;
static time_t cache_flush_last;
static avl_tree_t *cache = NULL;
static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+static rrd_queue_t *queue_head = NULL;
+static rrd_queue_t *queue_tail = NULL;
+static pthread_t queue_thread = 0;
+static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
+
+#if !HAVE_THREADSAFE_LIBRRD
+static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
+#endif
+
+static int do_shutdown = 0;
+
/* * * * * * * * * *
* WARNING: Magic *
* * * * * * * * * */
-static int rra_get (char ***ret)
+
+static void rra_free (int rra_num, char **rra_def)
+{
+ int i;
+
+ for (i = 0; i < rra_num; i++)
+ {
+ sfree (rra_def[i]);
+ }
+ sfree (rra_def);
+} /* void rra_free */
+
+static int rra_get (char ***ret, const value_list_t *vl)
{
- static char **rra_def = NULL;
- static int rra_num = 0;
+ char **rra_def;
+ int rra_num;
int *rts;
int rts_num;
char buffer[64];
- if ((rra_num != 0) && (rra_def != NULL))
+ /* The stepsize we use here: If it is user-set, use it. If not, use the
+ * interval of the value-list. */
+ int ss;
+
+ if (rrarows <= 0)
+ {
+ *ret = NULL;
+ return (-1);
+ }
+
+ ss = (stepsize > 0) ? stepsize : vl->interval;
+ if (ss <= 0)
{
- *ret = rra_def;
- return (rra_num);
+ *ret = NULL;
+ return (-1);
}
/* Use the configured timespans or fall back to the built-in defaults */
if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
return (-1);
memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
-
- if ((stepsize <= 0) || (rrarows <= 0))
- {
- *ret = NULL;
- return (-1);
- }
+ rra_num = 0;
cdp_len = 0;
for (i = 0; i < rts_num; i++)
{
span = rts[i];
- if ((span / stepsize) < rrarows)
+ if ((span / ss) < rrarows)
continue;
if (cdp_len == 0)
cdp_len = 1;
else
cdp_len = (int) floor (((double) span)
- / ((double) (rrarows * stepsize)));
+ / ((double) (rrarows * ss)));
cdp_num = (int) ceil (((double) span)
- / ((double) (cdp_len * stepsize)));
+ / ((double) (cdp_len * ss)));
for (j = 0; j < rra_types_num; j++)
{
*ret = rra_def;
return (rra_num);
-}
+} /* int rra_get */
static void ds_free (int ds_num, char **ds_def)
{
free (ds_def);
}
-static int ds_get (char ***ret, const data_set_t *ds)
+static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
{
char **ds_def;
int ds_num;
status = snprintf (buffer, sizeof (buffer),
"DS:%s:%s:%i:%s:%s",
- d->name, type, heartbeat,
+ d->name, type,
+ (heartbeat > 0) ? heartbeat : (2 * vl->interval),
min, max);
if ((status < 1) || (status >= sizeof (buffer)))
break;
return (ds_num);
}
+#if HAVE_THREADSAFE_LIBRRD
+static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
+ int argc, char **argv)
+{
+ int status;
+
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_create_r (filename, pdp_step, last_up, argc, argv);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s",
+ filename, rrd_get_error ());
+ }
+
+ return (status);
+} /* int srrd_create */
+
+static int srrd_update (char *filename, char *template, int argc, char **argv)
+{
+ int status;
+
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_update_r (filename, template, argc, argv);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
+ filename, rrd_get_error ());
+ }
+
+ return (status);
+} /* int srrd_update */
+/* #endif HAVE_THREADSAFE_LIBRRD */
+
+#else /* !HAVE_THREADSAFE_LIBRRD */
+static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
+ int argc, char **argv)
+{
+ int status;
+
+ int new_argc;
+ char **new_argv;
+
+ char pdp_step_str[16];
+ char last_up_str[16];
+
+ new_argc = 6 + argc;
+ new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
+ if (new_argv == NULL)
+ {
+ ERROR ("rrdtool plugin: malloc failed.");
+ return (-1);
+ }
+
+ if (last_up == 0)
+ last_up = time (NULL) - 10;
+
+ snprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step);
+ pdp_step_str[sizeof (pdp_step_str) - 1] = '\0';
+ snprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up);
+ last_up_str[sizeof (last_up_str) - 1] = '\0';
+
+ new_argv[0] = "create";
+ new_argv[1] = filename;
+ new_argv[2] = "-s";
+ new_argv[3] = pdp_step_str;
+ new_argv[4] = "-b";
+ new_argv[5] = last_up_str;
+
+ memcpy (new_argv + 6, argv, argc * sizeof (char *));
+ new_argv[new_argc] = NULL;
+
+ pthread_mutex_lock (&librrd_lock);
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_create (new_argc, new_argv);
+ pthread_mutex_unlock (&librrd_lock);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_create (%s) failed: %s",
+ filename, rrd_get_error ());
+ }
+
+ sfree (new_argv);
+
+ return (status);
+} /* int srrd_create */
+
+static int srrd_update (char *filename, char *template, int argc, char **argv)
+{
+ int status;
+
+ int new_argc;
+ char **new_argv;
+
+ assert (template == NULL);
+
+ new_argc = 2 + argc;
+ new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
+ if (new_argv == NULL)
+ {
+ ERROR ("rrdtool plugin: malloc failed.");
+ return (-1);
+ }
+
+ new_argv[0] = "update";
+ new_argv[1] = filename;
+
+ memcpy (new_argv + 2, argv, argc * sizeof (char *));
+ new_argv[new_argc] = NULL;
+
+ pthread_mutex_lock (&librrd_lock);
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_update (new_argc, new_argv);
+ pthread_mutex_unlock (&librrd_lock);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
+ argv[1], rrd_get_error ());
+ }
+
+ sfree (new_argv);
+
+ return (status);
+} /* int srrd_update */
+#endif /* !HAVE_THREADSAFE_LIBRRD */
+
static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
{
char **argv;
int rra_num;
char **ds_def;
int ds_num;
- int i, j;
- char stepsize_str[16];
- char begin_str[16];
int status = 0;
if (check_create_dir (filename))
return (-1);
- if ((rra_num = rra_get (&rra_def)) < 1)
+ if ((rra_num = rra_get (&rra_def, vl)) < 1)
{
ERROR ("rrd_create_file failed: Could not calculate RRAs");
return (-1);
}
- if ((ds_num = ds_get (&ds_def, ds)) < 1)
+ if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
{
ERROR ("rrd_create_file failed: Could not calculate DSes");
return (-1);
}
- argc = ds_num + rra_num + 6;
+ argc = ds_num + rra_num;
if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
{
return (-1);
}
- status = snprintf (stepsize_str, sizeof (stepsize_str),
- "%i", stepsize);
- if ((status < 1) || (status >= sizeof (stepsize_str)))
- {
- ERROR ("rrdtool plugin: snprintf failed.");
- return (-1);
- }
+ memcpy (argv, ds_def, ds_num * sizeof (char *));
+ memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *));
+ argv[ds_num + rra_num] = NULL;
assert (vl->time > 10);
- status = snprintf (begin_str, sizeof (begin_str),
- "%llu", (unsigned long long) (vl->time - 10));
- if ((status < 1) || (status >= sizeof (begin_str)))
- {
- ERROR ("rrdtool plugin: snprintf failed.");
- return (-1);
- }
-
- argv[0] = "create";
- argv[1] = filename;
- argv[2] = "-b";
- argv[3] = begin_str;
- argv[4] = "-s";
- argv[5] = stepsize_str;
-
- j = 6;
- for (i = 0; i < ds_num; i++)
- argv[j++] = ds_def[i];
- for (i = 0; i < rra_num; i++)
- argv[j++] = rra_def[i];
- argv[j] = NULL;
-
- optind = 0; /* bug in librrd? */
- rrd_clear_error ();
- if (rrd_create (argc, argv) == -1)
- {
- ERROR ("rrd_create failed: %s: %s", filename, rrd_get_error ());
- status = -1;
- }
+ status = srrd_create (filename,
+ (stepsize > 0) ? stepsize : vl->interval,
+ vl->time - 10,
+ argc, argv);
free (argv);
ds_free (ds_num, ds_def);
+ rra_free (rra_num, rra_def);
return (status);
}
return (0);
} /* int value_list_to_filename */
-static rrd_cache_t *rrd_cache_insert (const char *filename,
- const char *value, time_t value_time)
+static void *rrd_queue_thread (void *data)
{
- rrd_cache_t *rc = NULL;
- int new_rc = 0;
-
- if (cache != NULL)
- avl_get (cache, filename, (void *) &rc);
-
- if (rc == NULL)
+ while (42)
{
- rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
- if (rc == NULL)
- return (NULL);
- rc->values_num = 0;
- rc->values = NULL;
- rc->first_value = 0;
- rc->last_value = 0;
- new_rc = 1;
- }
-
- if (rc->last_value >= value_time)
- {
- WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
- (unsigned int) rc->last_value,
- (unsigned int) value_time);
- return (NULL);
- }
-
- rc->values = (char **) realloc ((void *) rc->values,
- (rc->values_num + 1) * sizeof (char *));
- if (rc->values == NULL)
- {
- char errbuf[1024];
- ERROR ("rrdtool plugin: realloc failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- if (cache != NULL)
+ rrd_queue_t *queue_entry;
+ rrd_cache_t *cache_entry;
+ char **values;
+ int values_num;
+ int i;
+
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+
+ /* wait until an entry is available */
+ pthread_mutex_lock (&queue_lock);
+ while ((queue_head == NULL) && (do_shutdown == 0))
+ pthread_cond_wait (&queue_cond, &queue_lock);
+
+ /* We're in the shutdown phase */
+ if (queue_head == NULL)
{
- void *cache_key = NULL;
- avl_remove (cache, filename, &cache_key, NULL);
- sfree (cache_key);
+ pthread_mutex_unlock (&queue_lock);
+ break;
}
- free (rc);
- return (NULL);
- }
- rc->values[rc->values_num] = strdup (value);
- if (rc->values[rc->values_num] != NULL)
- rc->values_num++;
+ /* Dequeue the first entry */
+ queue_entry = queue_head;
+ if (queue_head == queue_tail)
+ queue_head = queue_tail = NULL;
+ else
+ queue_head = queue_head->next;
- if (rc->values_num == 1)
- rc->first_value = value_time;
- rc->last_value = value_time;
+ /* Unlock the queue again */
+ pthread_mutex_unlock (&queue_lock);
- /* Insert if this is the first value */
- if ((cache != NULL) && (new_rc == 1))
- {
- void *cache_key = strdup (filename);
+ /* We now need the cache lock so the entry isn't updated while
+ * we make a copy of it's values */
+ pthread_mutex_lock (&cache_lock);
- if (cache_key == NULL)
- {
- char errbuf[1024];
- ERROR ("rrdtool plugin: strdup failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- sfree (rc->values[0]);
- sfree (rc->values);
- sfree (rc);
- return (NULL);
- }
+ avl_get (cache, queue_entry->filename, (void *) &cache_entry);
- avl_insert (cache, cache_key, rc);
- }
+ values = cache_entry->values;
+ values_num = cache_entry->values_num;
- DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value,
- (unsigned int) value_time, (void *) rc);
+ cache_entry->values = NULL;
+ cache_entry->values_num = 0;
+ cache_entry->flags = FLAG_NONE;
- return (rc);
-} /* rrd_cache_t *rrd_cache_insert */
+ pthread_mutex_unlock (&cache_lock);
-static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc)
-{
- char **argv;
- int argc;
+ /* Write the values to the RRD-file */
+ srrd_update (queue_entry->filename, NULL, values_num, values);
- char *fn;
- int status;
+ for (i = 0; i < values_num; i++)
+ {
+ sfree (values[i]);
+ }
+ sfree (values);
+ sfree (queue_entry->filename);
+ sfree (queue_entry);
+ } /* while (42) */
- int i;
+ pthread_mutex_lock (&cache_lock);
+ avl_destroy (cache);
+ cache = NULL;
+ pthread_mutex_unlock (&cache_lock);
- if (rc->values_num < 1)
- return (0);
+ pthread_exit ((void *) 0);
+ return ((void *) 0);
+} /* void *rrd_queue_thread */
- argc = rc->values_num + 2;
- argv = (char **) malloc ((argc + 1) * sizeof (char *));
- if (argv == NULL)
+static int rrd_queue_cache_entry (const char *filename)
+{
+ rrd_queue_t *queue_entry;
+
+ queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
+ if (queue_entry == NULL)
return (-1);
- fn = strdup (filename);
- if (fn == NULL)
+ queue_entry->filename = strdup (filename);
+ if (queue_entry->filename == NULL)
{
- free (argv);
+ free (queue_entry);
return (-1);
}
- argv[0] = "update";
- argv[1] = fn;
- memcpy (argv + 2, rc->values, rc->values_num * sizeof (char *));
- argv[argc] = NULL;
-
- DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv);
+ queue_entry->next = NULL;
- optind = 0; /* bug in librrd? */
- rrd_clear_error ();
- status = rrd_update (argc, argv);
- if (status != 0)
- {
- WARNING ("rrd_update failed: %s: %s",
- filename, rrd_get_error ());
- status = -1;
- }
+ pthread_mutex_lock (&queue_lock);
+ if (queue_tail == NULL)
+ queue_head = queue_entry;
+ else
+ queue_tail->next = queue_entry;
+ queue_tail = queue_entry;
+ pthread_cond_signal (&queue_cond);
+ pthread_mutex_unlock (&queue_lock);
- free (argv);
- free (fn);
- /* Free the value list of `rc' */
- for (i = 0; i < rc->values_num; i++)
- free (rc->values[i]);
- free (rc->values);
- rc->values = NULL;
- rc->values_num = 0;
+ DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
- return (status);
-} /* int rrd_write_cache_entry */
+ return (0);
+} /* int rrd_queue_cache_entry */
static void rrd_cache_flush (int timeout)
{
avl_iterator_t *iter;
int i;
- DEBUG ("Flushing cache, timeout = %i", timeout);
- if (cache == NULL)
- return;
-
+ DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
now = time (NULL);
while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
{
DEBUG ("key = %s; age = %i;", key, now - rc->first_value);
- if ((now - rc->first_value) >= timeout)
+
+ if (rc->flags == FLAG_QUEUED)
+ continue;
+ else if ((now - rc->first_value) < timeout)
+ continue;
+ else if (rc->values_num > 0)
+ {
+ if (rrd_queue_cache_entry (key) == 0)
+ rc->flags = FLAG_QUEUED;
+ }
+ else /* ancient and no values -> waste of memory */
{
keys = (char **) realloc ((void *) keys,
(keys_num + 1) * sizeof (char *));
{
if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
{
- DEBUG ("avl_remove (%s) failed.", keys[i]);
+ DEBUG ("rrdtool plugin: avl_remove (%s) failed.", keys[i]);
continue;
}
- rrd_write_cache_entry (keys[i], rc);
- /* rc's value-list is free's by `rrd_write_cache_entry' */
+ assert (rc->values == NULL);
+ assert (rc->values_num == 0);
+
sfree (rc);
sfree (key);
keys[i] = NULL;
} /* for (i = 0..keys_num) */
free (keys);
- DEBUG ("Flushed %i value(s)", keys_num);
cache_flush_last = now;
} /* void rrd_cache_flush */
+static int rrd_cache_insert (const char *filename,
+ const char *value, time_t value_time)
+{
+ rrd_cache_t *rc = NULL;
+ int new_rc = 0;
+ char **values_new;
+
+ pthread_mutex_lock (&cache_lock);
+
+ avl_get (cache, filename, (void *) &rc);
+
+ if (rc == NULL)
+ {
+ rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
+ if (rc == NULL)
+ return (-1);
+ rc->values_num = 0;
+ rc->values = NULL;
+ rc->first_value = 0;
+ rc->last_value = 0;
+ rc->flags = FLAG_NONE;
+ new_rc = 1;
+ }
+
+ if (rc->last_value >= value_time)
+ {
+ pthread_mutex_unlock (&cache_lock);
+ WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
+ (unsigned int) rc->last_value,
+ (unsigned int) value_time);
+ return (-1);
+ }
+
+ values_new = (char **) realloc ((void *) rc->values,
+ (rc->values_num + 1) * sizeof (char *));
+ if (values_new == NULL)
+ {
+ char errbuf[1024];
+ void *cache_key = NULL;
+
+ sstrerror (errno, errbuf, sizeof (errbuf));
+
+ avl_remove (cache, filename, &cache_key, NULL);
+ pthread_mutex_unlock (&cache_lock);
+
+ ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
+
+ sfree (cache_key);
+ sfree (rc->values);
+ sfree (rc);
+ return (-1);
+ }
+ rc->values = values_new;
+
+ rc->values[rc->values_num] = strdup (value);
+ if (rc->values[rc->values_num] != NULL)
+ rc->values_num++;
+
+ if (rc->values_num == 1)
+ rc->first_value = value_time;
+ rc->last_value = value_time;
+
+ /* Insert if this is the first value */
+ if (new_rc == 1)
+ {
+ void *cache_key = strdup (filename);
+
+ if (cache_key == NULL)
+ {
+ char errbuf[1024];
+ sstrerror (errno, errbuf, sizeof (errbuf));
+
+ pthread_mutex_unlock (&cache_lock);
+
+ ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
+
+ sfree (rc->values[0]);
+ sfree (rc->values);
+ sfree (rc);
+ return (-1);
+ }
+
+ avl_insert (cache, cache_key, rc);
+ }
+
+ DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value,
+ (unsigned int) value_time, (void *) rc);
+
+ if ((rc->last_value - rc->first_value) >= cache_timeout)
+ {
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+ if (rc->flags != FLAG_QUEUED)
+ {
+ if (rrd_queue_cache_entry (filename) == 0)
+ rc->flags = FLAG_QUEUED;
+ }
+ else
+ {
+ DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
+ }
+ }
+
+ if ((cache_timeout > 0) &&
+ ((time (NULL) - cache_flush_last) > cache_flush_timeout))
+ rrd_cache_flush (cache_flush_timeout);
+
+
+ pthread_mutex_unlock (&cache_lock);
+
+ return (0);
+} /* int rrd_cache_insert */
+
static int rrd_write (const data_set_t *ds, const value_list_t *vl)
{
struct stat statbuf;
char filename[512];
char values[512];
- rrd_cache_t *rc;
- time_t now;
+ int status;
if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
return (-1);
return (-1);
}
- pthread_mutex_lock (&cache_lock);
- rc = rrd_cache_insert (filename, values, vl->time);
- if (rc == NULL)
- {
- pthread_mutex_unlock (&cache_lock);
- return (-1);
- }
-
- if (cache == NULL)
- {
- rrd_write_cache_entry (filename, rc);
- /* rc's value-list is free's by `rrd_write_cache_entry' */
- sfree (rc);
- pthread_mutex_unlock (&cache_lock);
- return (0);
- }
-
- now = time (NULL);
+ status = rrd_cache_insert (filename, values, vl->time);
- DEBUG ("age (%s) = %i", filename, now - rc->first_value);
-
- /* `rc' is not free'd here, because we'll likely reuse it. If not, then
- * the next flush will remove this entry. */
- if ((now - rc->first_value) >= cache_timeout)
- rrd_write_cache_entry (filename, rc);
-
- if ((now - cache_flush_last) >= cache_flush_timeout)
- rrd_cache_flush (cache_flush_timeout);
-
- pthread_mutex_unlock (&cache_lock);
- return (0);
+ return (status);
} /* int rrd_write */
static int rrd_config (const char *key, const char *value)
}
else if (strcasecmp ("StepSize", key) == 0)
{
- int tmp = atoi (value);
- if (tmp <= 0)
- {
- fprintf (stderr, "rrdtool: `StepSize' must "
- "be greater than 0.\n");
- return (1);
- }
- stepsize = tmp;
+ stepsize = atoi (value);
+ if (stepsize < 0)
+ stepsize = 0;
}
else if (strcasecmp ("HeartBeat", key) == 0)
{
- int tmp = atoi (value);
- if (tmp <= 0)
- {
- fprintf (stderr, "rrdtool: `HeartBeat' must "
- "be greater than 0.\n");
- return (1);
- }
- heartbeat = tmp;
+ heartbeat = atoi (value);
+ if (heartbeat < 0)
+ heartbeat = 0;
}
else if (strcasecmp ("RRARows", key) == 0)
{
{
pthread_mutex_lock (&cache_lock);
rrd_cache_flush (-1);
- if (cache != NULL)
- avl_destroy (cache);
- cache = NULL;
pthread_mutex_unlock (&cache_lock);
+ /* Wait for all the values to be written to disk before returning. */
+ if (queue_thread != 0)
+ {
+ pthread_join (queue_thread, NULL);
+ queue_thread = 0;
+ DEBUG ("rrdtool plugin: queue_thread exited.");
+ }
+
+ pthread_mutex_lock (&queue_lock);
+ do_shutdown = 1;
+ pthread_cond_signal (&queue_cond);
+ pthread_mutex_unlock (&queue_lock);
+
return (0);
} /* int rrd_shutdown */
static int rrd_init (void)
{
- if (stepsize <= 0)
- stepsize = interval_g;
+ int status;
+
+ if (stepsize < 0)
+ stepsize = 0;
if (heartbeat <= 0)
- heartbeat = 2 * interval_g;
+ {
+ if (stepsize > 0)
+ heartbeat = 2 * stepsize;
+ else
+ heartbeat = 0;
+ }
- if (heartbeat < interval_g)
+ if ((heartbeat > 0) && (heartbeat < interval_g))
WARNING ("rrdtool plugin: Your `heartbeat' is "
"smaller than your `interval'. This will "
"likely cause problems.");
- else if (stepsize < interval_g)
+ else if ((stepsize > 0) && (stepsize < interval_g))
WARNING ("rrdtool plugin: Your `stepsize' is "
"smaller than your `interval'. This will "
"create needlessly big RRD-files.");
+ /* Set the cache up */
pthread_mutex_lock (&cache_lock);
+
+ cache = avl_create ((int (*) (const void *, const void *)) strcmp);
+ if (cache == NULL)
+ {
+ ERROR ("rrdtool plugin: avl_create failed.");
+ return (-1);
+ }
+
+ cache_flush_last = time (NULL);
if (cache_timeout < 2)
{
cache_timeout = 0;
cache_flush_timeout = 0;
}
- else
- {
- if (cache_flush_timeout < cache_timeout)
- cache_flush_timeout = 10 * cache_timeout;
+ else if (cache_flush_timeout < cache_timeout)
+ cache_flush_timeout = 10 * cache_timeout;
- cache = avl_create ((int (*) (const void *, const void *)) strcmp);
- cache_flush_last = time (NULL);
- plugin_register_shutdown ("rrdtool", rrd_shutdown);
- }
pthread_mutex_unlock (&cache_lock);
+ status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
+ if (status != 0)
+ {
+ ERROR ("rrdtool plugin: Cannot create queue-thread.");
+ return (-1);
+ }
+
DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
" heartbeat = %i; rrarows = %i; xff = %lf;",
(datadir == NULL) ? "(null)" : datadir,
config_keys, config_keys_num);
plugin_register_init ("rrdtool", rrd_init);
plugin_register_write ("rrdtool", rrd_write);
+ plugin_register_shutdown ("rrdtool", rrd_shutdown);
}
#include "common.h"
#include "plugin.h"
#include "configfile.h"
+#include "utils_cmd_putval.h"
/* Folks without pthread will need to disable this plugin. */
#include <pthread.h>
# define UNIX_PATH_MAX sizeof (((struct sockaddr_un *)0)->sun_path)
#endif
-#define US_DEFAULT_PATH PREFIX"/var/run/"PACKAGE_NAME"-unixsock"
+#define US_DEFAULT_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-unixsock"
/*
* Private data structures
} /* while (this != NULL) */
pthread_mutex_unlock (&cache_lock);
-} /* int cache_flush */
+} /* void cache_flush */
static int us_open_socket (void)
{
return (0);
} /* int us_handle_getval */
-static int us_handle_putval (FILE *fh, char **fields, int fields_num)
+static int us_handle_listval (FILE *fh, char **fields, int fields_num)
{
- char *hostname;
- char *plugin;
- char *plugin_instance;
- char *type;
- char *type_instance;
- int status;
- int i;
-
- const data_set_t *ds;
- value_list_t vl = VALUE_LIST_INIT;
-
- char **value_ptr;
+ char buffer[1024];
+ char **value_list = NULL;
+ int value_list_len = 0;
+ value_cache_t *entry;
+ int i;
- if (fields_num != 3)
+ if (fields_num != 1)
{
- DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num);
- fprintf (fh, "-1 Wrong number of fields: Got %i, expected 3.\n",
+ DEBUG ("unixsock plugin: us_handle_listval: "
+ "Wrong number of fields: %i", fields_num);
+ fprintf (fh, "-1 Wrong number of fields: Got %i, expected 1.\n",
fields_num);
fflush (fh);
return (-1);
}
- status = parse_identifier (fields[1], &hostname,
- &plugin, &plugin_instance,
- &type, &type_instance);
- if (status != 0)
- {
- DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]);
- fprintf (fh, "-1 Cannot parse identifier.\n");
- fflush (fh);
- return (-1);
- }
-
- if ((strlen (hostname) >= sizeof (vl.host))
- || (strlen (plugin) >= sizeof (vl.plugin))
- || ((plugin_instance != NULL)
- && (strlen (plugin_instance) >= sizeof (vl.plugin_instance)))
- || ((type_instance != NULL)
- && (strlen (type_instance) >= sizeof (vl.type_instance))))
- {
- fprintf (fh, "-1 Identifier too long.");
- return (-1);
- }
-
- strcpy (vl.host, hostname);
- strcpy (vl.plugin, plugin);
- if (plugin_instance != NULL)
- strcpy (vl.plugin_instance, plugin_instance);
- if (type_instance != NULL)
- strcpy (vl.type_instance, type_instance);
-
- { /* parse the time */
- char *t = fields[2];
- char *v = strchr (t, ':');
- if (v == NULL)
- {
- fprintf (fh, "-1 No time found.");
- return (-1);
- }
- *v = '\0'; v++;
-
- vl.time = (time_t) atoi (t);
- if (vl.time == 0)
- vl.time = time (NULL);
-
- fields[2] = v;
- }
-
- ds = plugin_get_ds (type);
- if (ds == NULL)
- return (-1);
-
- value_ptr = (char **) calloc (ds->ds_num, sizeof (char *));
- if (value_ptr == NULL)
- {
- fprintf (fh, "-1 calloc failed.");
- return (-1);
- }
-
- { /* parse the value-list. It's colon-separated. */
- char *dummy;
- char *ptr;
- char *saveptr;
-
- i = 0;
- dummy = fields[2];
- saveptr = NULL;
- while ((ptr = strtok_r (dummy, ":", &saveptr)) != NULL)
- {
- dummy = NULL;
- if (i >= ds->ds_num)
- {
- i = ds->ds_num + 1;
- break;
- }
- value_ptr[i] = ptr;
- i++;
- }
-
- if (i != ds->ds_num)
- {
- sfree (value_ptr);
- fprintf (fh, "-1 Number of values incorrect: Got %i, "
- "expected %i.", i, ds->ds_num);
- return (-1);
- }
- } /* done parsing the value-list */
-
- vl.values_len = ds->ds_num;
- vl.values = (value_t *) malloc (vl.values_len * sizeof (value_t));
- if (vl.values == NULL)
- {
- sfree (value_ptr);
- fprintf (fh, "-1 malloc failed.");
- return (-1);
- }
- DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values);
+ pthread_mutex_lock (&cache_lock);
- for (i = 0; i < ds->ds_num; i++)
+ for (entry = cache_head; entry != NULL; entry = entry->next)
{
- if (strcmp (value_ptr[i], "U") == 0)
- vl.values[i].gauge = NAN;
- else if (ds->ds[i].type == DS_TYPE_COUNTER)
- vl.values[i].counter = atoll (value_ptr[i]);
- else if (ds->ds[i].type == DS_TYPE_GAUGE)
- vl.values[i].gauge = atof (value_ptr[i]);
- } /* for (i = 2 .. fields_num) */
+ char **tmp;
+
+ snprintf (buffer, sizeof (buffer), "%u %s\n",
+ (unsigned int) entry->time, entry->name);
+ buffer[sizeof (buffer) - 1] = '\0';
+
+ tmp = realloc (value_list, sizeof (char *) * (value_list_len + 1));
+ if (tmp == NULL)
+ continue;
+ value_list = tmp;
- plugin_dispatch_values (type, &vl);
+ value_list[value_list_len] = strdup (buffer);
- DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values);
+ if (value_list[value_list_len] != NULL)
+ value_list_len++;
+ } /* for (entry) */
- sfree (value_ptr);
- sfree (vl.values);
+ pthread_mutex_unlock (&cache_lock);
- fprintf (fh, "0 Success\n");
+ DEBUG ("unixsock plugin: us_handle_listval: value_list_len = %i", value_list_len);
+ fprintf (fh, "%i Values found\n", value_list_len);
+ for (i = 0; i < value_list_len; i++)
+ fputs (value_list[i], fh);
fflush (fh);
return (0);
-} /* int us_handle_putval */
+} /* int us_handle_listval */
static void *us_handle_client (void *arg)
{
}
else if (strcasecmp (fields[0], "putval") == 0)
{
- us_handle_putval (fh, fields, fields_num);
+ handle_putval (fh, fields, fields_num);
+ }
+ else if (strcasecmp (fields[0], "listval") == 0)
+ {
+ us_handle_listval (fh, fields, fields_num);
}
else
{
while (loop != 0)
{
- DEBUG ("Calling accept..");
+ DEBUG ("unixsock plugin: Calling accept..");
status = accept (sock_fd, NULL, NULL);
if (status < 0)
{