# collectd - Collectd.pm
-# Copyright (C) 2007 Sebastian Harl
+# Copyright (C) 2007, 2008 Sebastian Harl
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
plugin_register
plugin_unregister
plugin_dispatch_values
+ plugin_flush
+ plugin_flush_one
+ plugin_flush_all
+ plugin_dispatch_notification
plugin_log
) ],
'types' => [ qw(
TYPE_WRITE
TYPE_SHUTDOWN
TYPE_LOG
+ TYPE_NOTIF
+ TYPE_FLUSH
+ TYPE_CONFIG
TYPE_DATASET
) ],
'ds_types' => [ qw(
LOG_INFO
LOG_DEBUG
) ],
+ 'notif' => [ qw(
+ NOTIF_FAILURE
+ NOTIF_WARNING
+ NOTIF_OKAY
+ ) ],
+ 'globals' => [ qw(
+ $hostname_g
+ $interval_g
+ ) ],
);
{
foreach keys %EXPORT_TAGS;
}
+# global variables
+our $hostname_g;
+our $interval_g;
+
Exporter::export_ok_tags ('all');
my @plugins : shared = ();
+my %cf_callbacks : shared = ();
my %types = (
TYPE_INIT, "init",
TYPE_READ, "read",
TYPE_WRITE, "write",
TYPE_SHUTDOWN, "shutdown",
- TYPE_LOG, "log"
+ TYPE_LOG, "log",
+ TYPE_NOTIF, "notify",
+ TYPE_FLUSH, "flush"
);
foreach my $type (keys %types) {
sub plugin_call_all {
my $type = shift;
+ my %plugins;
+
our $cb_name = undef;
if (! defined $type) {
return;
}
- lock @plugins;
- foreach my $plugin (keys %{$plugins[$type]}) {
- my $p = $plugins[$type]->{$plugin};
+ {
+ lock %{$plugins[$type]};
+ %plugins = %{$plugins[$type]};
+ }
+
+ foreach my $plugin (keys %plugins) {
+ my $p = $plugins{$plugin};
my $status = 0;
if ($p->{'wait_left'} > 0) {
- # TODO: use interval_g
- $p->{'wait_left'} -= 10;
+ $p->{'wait_left'} -= $interval_g;
}
next if ($p->{'wait_left'} > 0);
$cb_name = $p->{'cb_name'};
$status = call_by_name (@_);
- if (! defined $status) {
+ if (! $status) {
+ my $err = undef;
+
+ if ($@) {
+ $err = $@;
+ }
+ else {
+ $err = "callback returned false";
+ }
+
if (TYPE_LOG != $type) {
- ERROR ("Could not execute callback \"$cb_name\": $@");
+ ERROR ("Execution of callback \"$cb_name\" failed: $err");
}
- next;
+ $status = 0;
}
if ($status) {
$p->{'wait_left'} = 0;
- $p->{'wait_time'} = 10;
+ $p->{'wait_time'} = $interval_g;
}
elsif (TYPE_READ == $type) {
+ if ($p->{'wait_time'} < $interval_g) {
+ $p->{'wait_time'} = $interval_g;
+ }
+
$p->{'wait_left'} = $p->{'wait_time'};
$p->{'wait_time'} *= 2;
. "Will suspend it for $p->{'wait_left'} seconds.");
}
elsif (TYPE_INIT == $type) {
+ ERROR ("${plugin}->init() failed with status $status. "
+ . "Plugin will be disabled.");
+
foreach my $type (keys %types) {
plugin_unregister ($type, $plugin);
}
-
- ERROR ("${plugin}->init() failed with status $status. "
- . "Plugin will be disabled.");
}
elsif (TYPE_LOG != $type) {
WARNING ("${plugin}->$types{$type}() failed with status $status.");
return;
}
- if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)) {
+ if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
+ && (TYPE_CONFIG != $type)) {
ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
return;
}
if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
return plugin_register_data_set ($name, $data);
}
+ elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
+ my $pkg = scalar caller;
+
+ if ($data !~ m/^$pkg\:\:/) {
+ $data = $pkg . "::" . $data;
+ }
+
+ lock %cf_callbacks;
+ $cf_callbacks{$name} = $data;
+ }
elsif ((TYPE_DATASET != $type) && (! ref $data)) {
my $pkg = scalar caller;
my %p : shared;
- if ($data !~ m/^$pkg/) {
+ if ($data !~ m/^$pkg\:\:/) {
$data = $pkg . "::" . $data;
}
- # TODO: make interval_g available at configuration time
%p = (
- wait_time => 10,
+ wait_time => $interval_g,
wait_left => 0,
cb_name => $data,
);
- lock @plugins;
+ lock %{$plugins[$type]};
$plugins[$type]->{$name} = \%p;
}
else {
if (TYPE_DATASET == $type) {
return plugin_unregister_data_set ($name);
}
+ elsif (TYPE_CONFIG == $type) {
+ lock %cf_callbacks;
+ delete $cf_callbacks{$name};
+ }
elsif (defined $plugins[$type]) {
- lock @plugins;
+ lock %{$plugins[$type]};
delete $plugins[$type]->{$name};
}
else {
}
}
+sub plugin_flush {
+ my %args = @_;
+
+ my $timeout = -1;
+ my @plugins = ();
+ my @ids = ();
+
+ DEBUG ("Collectd::plugin_flush:"
+ . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
+ . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
+ . (defined ($args{'identifiers'})
+ ? " identifiers = $args{'identifiers'}" : ""));
+
+ if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
+ $timeout = $args{'timeout'};
+ }
+
+ if (defined ($args{'plugins'})) {
+ if ("ARRAY" eq ref ($args{'plugins'})) {
+ @plugins = @{$args{'plugins'}};
+ }
+ else {
+ @plugins = ($args{'plugins'});
+ }
+ }
+ else {
+ @plugins = (undef);
+ }
+
+ if (defined ($args{'identifiers'})) {
+ if ("ARRAY" eq ref ($args{'identifiers'})) {
+ @ids = @{$args{'identifiers'}};
+ }
+ else {
+ @ids = ($args{'identifiers'});
+ }
+ }
+ else {
+ @ids = (undef);
+ }
+
+ foreach my $plugin (@plugins) {
+ foreach my $id (@ids) {
+ _plugin_flush($plugin, $timeout, $id);
+ }
+ }
+}
+
+sub plugin_flush_one {
+ my $timeout = shift;
+ my $name = shift;
+
+ WARNING ("Collectd::plugin_flush_one is deprecated - "
+ . "use Collectd::plugin_flush instead.");
+
+ if (! (defined ($timeout) && defined ($name))) {
+ ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
+ return;
+ }
+
+ plugin_flush (plugins => $name, timeout => $timeout);
+}
+
+sub plugin_flush_all {
+ my $timeout = shift;
+
+ WARNING ("Collectd::plugin_flush_all is deprecated - "
+ . "use Collectd::plugin_flush instead.");
+
+ if (! defined ($timeout)) {
+ ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
+ return;
+ }
+
+ plugin_flush (timeout => $timeout);
+}
+
+sub _plugin_dispatch_config {
+ my $plugin = shift;
+ my $config = shift;
+
+ our $cb_name = undef;
+
+ if (! (defined ($plugin) && defined ($config))) {
+ return;
+ }
+
+ if (! defined $cf_callbacks{$plugin}) {
+ WARNING ("Found a configuration for the \"$plugin\" plugin, but "
+ . "the plugin isn't loaded or didn't register "
+ . "a configuration callback.");
+ return;
+ }
+
+ {
+ lock %cf_callbacks;
+ $cb_name = $cf_callbacks{$plugin};
+ }
+ call_by_name ($config);
+}
+
1;
# vim: set sw=4 ts=4 tw=78 noexpandtab :