Merge branch 'ms/aix'
[collectd.git] / bindings / perl / Collectd.pm
index fd5632a..557950c 100644 (file)
@@ -1,5 +1,5 @@
 # collectd - Collectd.pm
-# Copyright (C) 2007  Sebastian Harl
+# Copyright (C) 2007-2009  Sebastian Harl
 #
 # This program is free software; you can redistribute it and/or modify it
 # under the terms of the GNU General Public License as published by the
@@ -22,6 +22,17 @@ package Collectd;
 use strict;
 use warnings;
 
+use Config;
+
+use threads;
+use threads::shared;
+
+BEGIN {
+       if (! $Config{'useithreads'}) {
+               die "Perl does not support ithreads!";
+       }
+}
+
 require Exporter;
 
 our @ISA = qw( Exporter );
@@ -31,6 +42,11 @@ our %EXPORT_TAGS = (
                        plugin_register
                        plugin_unregister
                        plugin_dispatch_values
+                       plugin_write
+                       plugin_flush
+                       plugin_flush_one
+                       plugin_flush_all
+                       plugin_dispatch_notification
                        plugin_log
        ) ],
        'types' => [ qw(
@@ -39,6 +55,9 @@ our %EXPORT_TAGS = (
                        TYPE_WRITE
                        TYPE_SHUTDOWN
                        TYPE_LOG
+                       TYPE_NOTIF
+                       TYPE_FLUSH
+                       TYPE_CONFIG
                        TYPE_DATASET
        ) ],
        'ds_types' => [ qw(
@@ -57,6 +76,27 @@ our %EXPORT_TAGS = (
                        LOG_INFO
                        LOG_DEBUG
        ) ],
+       'filter_chain' => [ qw(
+                       fc_register
+                       FC_MATCH_NO_MATCH
+                       FC_MATCH_MATCHES
+                       FC_TARGET_CONTINUE
+                       FC_TARGET_STOP
+                       FC_TARGET_RETURN
+       ) ],
+       'fc_types' => [ qw(
+                       FC_MATCH
+                       FC_TARGET
+       ) ],
+       'notif' => [ qw(
+                       NOTIF_FAILURE
+                       NOTIF_WARNING
+                       NOTIF_OKAY
+       ) ],
+       'globals' => [ qw(
+                       $hostname_g
+                       $interval_g
+       ) ],
 );
 
 {
@@ -65,21 +105,42 @@ our %EXPORT_TAGS = (
                foreach keys %EXPORT_TAGS;
 }
 
+# global variables
+our $hostname_g;
+our $interval_g;
+
 Exporter::export_ok_tags ('all');
 
-my @plugins  = ();
-my @datasets = ();
+my @plugins : shared = ();
+my @fc_plugins : shared = ();
+my %cf_callbacks : shared = ();
 
 my %types = (
        TYPE_INIT,     "init",
        TYPE_READ,     "read",
        TYPE_WRITE,    "write",
        TYPE_SHUTDOWN, "shutdown",
-       TYPE_LOG,      "log"
+       TYPE_LOG,      "log",
+       TYPE_NOTIF,    "notify",
+       TYPE_FLUSH,    "flush"
+);
+
+my %fc_types = (
+       FC_MATCH,  "match",
+       FC_TARGET, "target"
+);
+
+my %fc_exec_names = (
+       FC_MATCH,  "match",
+       FC_TARGET, "invoke"
 );
 
 foreach my $type (keys %types) {
-       $plugins[$type] = {};
+       $plugins[$type] = &share ({});
+}
+
+foreach my $type (keys %fc_types) {
+       $fc_plugins[$type] = &share ({});
 }
 
 sub _log {
@@ -102,6 +163,10 @@ sub DEBUG   { _log (scalar caller, LOG_DEBUG,   shift); }
 sub plugin_call_all {
        my $type = shift;
 
+       my %plugins;
+
+       our $cb_name = undef;
+
        if (! defined $type) {
                return;
        }
@@ -115,21 +180,51 @@ sub plugin_call_all {
                return;
        }
 
-       foreach my $plugin (keys %{$plugins[$type]}) {
-               my $p = $plugins[$type]->{$plugin};
+       {
+               lock %{$plugins[$type]};
+               %plugins = %{$plugins[$type]};
+       }
+
+       foreach my $plugin (keys %plugins) {
+               my $p = $plugins{$plugin};
+
+               my $status = 0;
 
                if ($p->{'wait_left'} > 0) {
-                       # TODO: use interval_g
-                       $p->{'wait_left'} -= 10;
+                       $p->{'wait_left'} -= $interval_g;
                }
 
                next if ($p->{'wait_left'} > 0);
 
-               if (my $status = $p->{'code'}->(@_)) {
+               $cb_name = $p->{'cb_name'};
+               $status = call_by_name (@_);
+
+               if (! $status) {
+                       my $err = undef;
+
+                       if ($@) {
+                               $err = $@;
+                       }
+                       else {
+                               $err = "callback returned false";
+                       }
+
+                       if (TYPE_LOG != $type) {
+                               ERROR ("Execution of callback \"$cb_name\" failed: $err");
+                       }
+
+                       $status = 0;
+               }
+
+               if ($status) {
                        $p->{'wait_left'} = 0;
-                       $p->{'wait_time'} = 10;
+                       $p->{'wait_time'} = $interval_g;
                }
                elsif (TYPE_READ == $type) {
+                       if ($p->{'wait_time'} < $interval_g) {
+                               $p->{'wait_time'} = $interval_g;
+                       }
+
                        $p->{'wait_left'} = $p->{'wait_time'};
                        $p->{'wait_time'} *= 2;
 
@@ -141,12 +236,12 @@ sub plugin_call_all {
                                . "Will suspend it for $p->{'wait_left'} seconds.");
                }
                elsif (TYPE_INIT == $type) {
+                       ERROR ("${plugin}->init() failed with status $status. "
+                               . "Plugin will be disabled.");
+
                        foreach my $type (keys %types) {
                                plugin_unregister ($type, $plugin);
                        }
-
-                       ERROR ("${plugin}->init() failed with status $status. "
-                               . "Plugin will be disabled.");
                }
                elsif (TYPE_LOG != $type) {
                        WARNING ("${plugin}->$types{$type}() failed with status $status.");
@@ -179,7 +274,8 @@ sub plugin_register {
                return;
        }
 
-       if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)) {
+       if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)
+                       && (TYPE_CONFIG != $type)) {
                ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
                return;
        }
@@ -187,13 +283,33 @@ sub plugin_register {
        if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
                return plugin_register_data_set ($name, $data);
        }
-       elsif ("CODE" eq ref $data) {
-               # TODO: make interval_g available at configuration time
-               $plugins[$type]->{$name} = {
-                               wait_time => 10,
-                               wait_left => 0,
-                               code      => $data,
-               };
+       elsif ((TYPE_CONFIG == $type) && (! ref $data)) {
+               my $pkg = scalar caller;
+
+               if ($data !~ m/^$pkg\:\:/) {
+                       $data = $pkg . "::" . $data;
+               }
+
+               lock %cf_callbacks;
+               $cf_callbacks{$name} = $data;
+       }
+       elsif ((TYPE_DATASET != $type) && (! ref $data)) {
+               my $pkg = scalar caller;
+
+               my %p : shared;
+
+               if ($data !~ m/^$pkg\:\:/) {
+                       $data = $pkg . "::" . $data;
+               }
+
+               %p = (
+                       wait_time => $interval_g,
+                       wait_left => 0,
+                       cb_name   => $data,
+               );
+
+               lock %{$plugins[$type]};
+               $plugins[$type]->{$name} = \%p;
        }
        else {
                ERROR ("Collectd::plugin_register: Invalid data.");
@@ -216,7 +332,12 @@ sub plugin_unregister {
        if (TYPE_DATASET == $type) {
                return plugin_unregister_data_set ($name);
        }
+       elsif (TYPE_CONFIG == $type) {
+               lock %cf_callbacks;
+               delete $cf_callbacks{$name};
+       }
        elsif (defined $plugins[$type]) {
+               lock %{$plugins[$type]};
                delete $plugins[$type]->{$name};
        }
        else {
@@ -225,6 +346,302 @@ sub plugin_unregister {
        }
 }
 
+sub plugin_write {
+       my %args = @_;
+
+       my @plugins    = ();
+       my @datasets   = ();
+       my @valuelists = ();
+
+       if (! defined $args{'valuelists'}) {
+               ERROR ("Collectd::plugin_write: Missing 'valuelists' argument.");
+               return;
+       }
+
+       DEBUG ("Collectd::plugin_write:"
+               . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
+               . (defined ($args{'datasets'}) ? " datasets = $args{'datasets'}" : "")
+               . " valueslists = $args{'valuelists'}");
+
+       if (defined ($args{'plugins'})) {
+               if ("ARRAY" eq ref ($args{'plugins'})) {
+                       @plugins = @{$args{'plugins'}};
+               }
+               else {
+                       @plugins = ($args{'plugins'});
+               }
+       }
+       else {
+               @plugins = (undef);
+       }
+
+       if ("ARRAY" eq ref ($args{'valuelists'})) {
+               @valuelists = @{$args{'valuelists'}};
+       }
+       else {
+               @valuelists = ($args{'valuelists'});
+       }
+
+       if (defined ($args{'datasets'})) {
+               if ("ARRAY" eq ref ($args{'datasets'})) {
+                       @datasets = @{$args{'datasets'}};
+               }
+               else {
+                       @datasets = ($args{'datasets'});
+               }
+       }
+       else {
+               @datasets = (undef) x scalar (@valuelists);
+       }
+
+       if ($#datasets != $#valuelists) {
+               ERROR ("Collectd::plugin_write: Invalid number of datasets.");
+               return;
+       }
+
+       foreach my $plugin (@plugins) {
+               for (my $i = 0; $i < scalar (@valuelists); ++$i) {
+                       _plugin_write ($plugin, $datasets[$i], $valuelists[$i]);
+               }
+       }
+}
+
+sub plugin_flush {
+       my %args = @_;
+
+       my $timeout = -1;
+       my @plugins = ();
+       my @ids     = ();
+
+       DEBUG ("Collectd::plugin_flush:"
+               . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
+               . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
+               . (defined ($args{'identifiers'})
+                       ? " identifiers = $args{'identifiers'}" : ""));
+
+       if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
+               $timeout = $args{'timeout'};
+       }
+
+       if (defined ($args{'plugins'})) {
+               if ("ARRAY" eq ref ($args{'plugins'})) {
+                       @plugins = @{$args{'plugins'}};
+               }
+               else {
+                       @plugins = ($args{'plugins'});
+               }
+       }
+       else {
+               @plugins = (undef);
+       }
+
+       if (defined ($args{'identifiers'})) {
+               if ("ARRAY" eq ref ($args{'identifiers'})) {
+                       @ids = @{$args{'identifiers'}};
+               }
+               else {
+                       @ids = ($args{'identifiers'});
+               }
+       }
+       else {
+               @ids = (undef);
+       }
+
+       foreach my $plugin (@plugins) {
+               foreach my $id (@ids) {
+                       _plugin_flush($plugin, $timeout, $id);
+               }
+       }
+}
+
+sub plugin_flush_one {
+       my $timeout = shift;
+       my $name    = shift;
+
+       WARNING ("Collectd::plugin_flush_one is deprecated - "
+               . "use Collectd::plugin_flush instead.");
+
+       if (! (defined ($timeout) && defined ($name))) {
+               ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
+               return;
+       }
+
+       plugin_flush (plugins => $name, timeout => $timeout);
+}
+
+sub plugin_flush_all {
+       my $timeout = shift;
+
+       WARNING ("Collectd::plugin_flush_all is deprecated - "
+               . "use Collectd::plugin_flush instead.");
+
+       if (! defined ($timeout)) {
+               ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
+               return;
+       }
+
+       plugin_flush (timeout => $timeout);
+}
+
+sub fc_call {
+       my $type    = shift;
+       my $name    = shift;
+       my $cb_type = shift;
+
+       my %proc;
+
+       our $cb_name = undef;
+       my  $status;
+
+       if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
+               ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
+               return;
+       }
+
+       if (! defined $fc_plugins[$type]) {
+               ERROR ("Collectd::fc_call: Invalid type \"$type\"");
+               return;
+       }
+
+       if (! defined $fc_plugins[$type]->{$name}) {
+               ERROR ("Collectd::fc_call: Unknown "
+                       . ($type == FC_MATCH ? "match" : "target")
+                       . " \"$name\"");
+               return;
+       }
+
+       DEBUG ("Collectd::fc_call: "
+               . "type = \"$type\", name = \"$name\", cb_type = \"$cb_type\"");
+
+       {
+               lock %{$fc_plugins[$type]};
+               %proc = %{$fc_plugins[$type]->{$name}};
+       }
+
+       if (FC_CB_EXEC == $cb_type) {
+               $cb_name = $proc{$fc_exec_names{$type}};
+       }
+       elsif (FC_CB_CREATE == $cb_type) {
+               if (defined $proc{'create'}) {
+                       $cb_name = $proc{'create'};
+               }
+               else {
+                       return 1;
+               }
+       }
+       elsif (FC_CB_DESTROY == $cb_type) {
+               if (defined $proc{'destroy'}) {
+                       $cb_name = $proc{'destroy'};
+               }
+               else {
+                       return 1;
+               }
+       }
+
+       $status = call_by_name (@_);
+
+       if ($status < 0) {
+               my $err = undef;
+
+               if ($@) {
+                       $err = $@;
+               }
+               else {
+                       $err = "callback returned false";
+               }
+
+               ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
+               return;
+       }
+       return $status;
+}
+
+sub fc_register {
+       my $type = shift;
+       my $name = shift;
+       my $proc = shift;
+
+       my %fc : shared;
+
+       DEBUG ("Collectd::fc_register: "
+               . "type = \"$type\", name = \"$name\", proc = \"$proc\"");
+
+       if (! ((defined $type) && (defined $name) && (defined $proc))) {
+               ERROR ("Usage: Collectd::fc_register(type, name, proc)");
+               return;
+       }
+
+       if (! defined $fc_plugins[$type]) {
+               ERROR ("Collectd::fc_register: Invalid type \"$type\"");
+               return;
+       }
+
+       if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
+                       || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
+               ERROR ("Collectd::fc_register: Invalid proc.");
+               return;
+       }
+
+       for my $p (qw( create destroy )) {
+               if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
+                       ERROR ("Collectd::fc_register: Invalid proc.");
+                       return;
+               }
+       }
+
+       %fc = %$proc;
+
+       foreach my $p (keys %fc) {
+               my $pkg = scalar caller;
+
+               if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
+                       next;
+               }
+
+               if ($fc{$p} !~ m/^$pkg\:\:/) {
+                       $fc{$p} = $pkg . "::" . $fc{$p};
+               }
+       }
+
+       lock %{$fc_plugins[$type]};
+       if (defined $fc_plugins[$type]->{$name}) {
+               WARNING ("Collectd::fc_register: Overwriting previous "
+                       . "definition of match \"$name\".");
+       }
+
+       if (! _fc_register ($type, $name)) {
+               ERROR ("Collectd::fc_register: Failed to register \"$name\".");
+               return;
+       }
+
+       $fc_plugins[$type]->{$name} = \%fc;
+       return 1;
+}
+
+sub _plugin_dispatch_config {
+       my $plugin = shift;
+       my $config = shift;
+
+       our $cb_name = undef;
+
+       if (! (defined ($plugin) && defined ($config))) {
+               return;
+       }
+
+       if (! defined $cf_callbacks{$plugin}) {
+               WARNING ("Found a configuration for the \"$plugin\" plugin, but "
+                       . "the plugin isn't loaded or didn't register "
+                       . "a configuration callback.");
+               return;
+       }
+
+       {
+               lock %cf_callbacks;
+               $cb_name = $cf_callbacks{$plugin};
+       }
+       call_by_name ($config);
+}
+
 1;
 
 # vim: set sw=4 ts=4 tw=78 noexpandtab :