Merge branch 'collectd-4.7'
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Mon, 17 Aug 2009 08:46:05 +0000 (10:46 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Mon, 17 Aug 2009 08:46:05 +0000 (10:46 +0200)
Conflicts:
src/network.c

1  2 
src/collectd.conf.in
src/collectd.conf.pod
src/java.c
src/meta_data.c
src/plugin.c
src/utils_cache.c

diff --combined src/collectd.conf.in
@@@ -57,7 -57,6 +57,7 @@@ FQDNLookup   tru
  #@BUILD_PLUGIN_BATTERY_TRUE@LoadPlugin battery
  #@BUILD_PLUGIN_BIND_TRUE@LoadPlugin bind
  #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack
 +#@BUILD_PLUGIN_COUCHDB_TRUE@LoadPlugin couchdb
  @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu
  #@BUILD_PLUGIN_CPUFREQ_TRUE@LoadPlugin cpufreq
  @LOAD_PLUGIN_CSV@LoadPlugin csv
@@@ -95,7 -94,6 +95,7 @@@
  #@BUILD_PLUGIN_NOTIFY_EMAIL_TRUE@LoadPlugin notify_email
  #@BUILD_PLUGIN_NTPD_TRUE@LoadPlugin ntpd
  #@BUILD_PLUGIN_NUT_TRUE@LoadPlugin nut
 +#@BUILD_PLUGIN_OLSRD_TRUE@LoadPlugin olsrd
  #@BUILD_PLUGIN_ONEWIRE_TRUE@LoadPlugin onewire
  #@BUILD_PLUGIN_OPENVPN_TRUE@LoadPlugin openvpn
  #@BUILD_PLUGIN_ORACLE_TRUE@LoadPlugin oracle
  #@BUILD_PLUGIN_TEAMSPEAK2_TRUE@LoadPlugin teamspeak2
  #@BUILD_PLUGIN_TED_TRUE@LoadPlugin ted
  #@BUILD_PLUGIN_THERMAL_TRUE@LoadPlugin thermal
 +#@BUILD_PLUGIN_TOKYOTYRANT_TRUE@LoadPlugin tokyotyrant
  #@BUILD_PLUGIN_UNIXSOCK_TRUE@LoadPlugin unixsock
  #@BUILD_PLUGIN_UPTIME_TRUE@LoadPlugin uptime
  #@BUILD_PLUGIN_USERS_TRUE@LoadPlugin users
  #  </View>
  #</Plugin>
  
 +#<Plugin couchdb>
 +## See: http://wiki.apache.org/couchdb/Runtime_Statistics
 +#  <URL "http://localhost:5984/_stats">
 +#    Instance "httpd"
 +#    <Key "httpd/requests/count">
 +#      Type "http_requests"
 +#    </Key>
 +#
 +#    <Key "httpd_request_methods/*/count">
 +#      Type "http_request_methods"
 +#    </Key>
 +#
 +#    <Key "httpd_status_codes/*/count">
 +#      Type "http_response_codes"
 +#    </Key>
 +#  </URL>
 +## Database status metrics:
 +#  <URL "http://localhost:5984/_all_dbs">
 +#    Instance "dbs"
 +#    <Key "*/doc_count">
 +#      Type "gauge"
 +#    </Key>
 +#    <Key "*/doc_del_count">
 +#      Type "counter"
 +#    </Key>
 +#    <Key "*/disk_size">
 +#      Type "bytes"
 +#    </Key>
 +#  </URL>
 +#</Plugin>
 +
  #<Plugin csv>
  #     DataDir "@prefix@/var/lib/@PACKAGE_NAME@/csv"
  #     StoreRates false
  #     MountPoint "/home"
  #     FSType "ext3"
  #     IgnoreSelected false
 +#     ReportByDevice false
  #</Plugin>
  
  #<Plugin disk>
  #  Port "7634"
  #
  #  #----------------------------------------------------------------#
- #  # `TranslateDevicename' enabled backwards compatibility behavior #
+ #  # `TranslateDevicename' enables backwards compatibility behavior #
  #  # and is enabled by default. Setting this option to `false' is   #
  #  # highly recommended.                                            #
  #  #----------------------------------------------------------------#
  
  #<Plugin "java">
  #     JVMArg "-verbose:jni"
 -#     JVMArg "-Djava.class.path=/opt/collectd/lib/collectd/bindings/java"
 +#     JVMArg "-Djava.class.path=@prefix@/share/collectd/java"
  #
  #     LoadPlugin "org.collectd.java.Foobar"
  #     <Plugin "org.collectd.java.Foobar">
  #     UPS "upsname@hostname:port"
  #</Plugin>
  
 +#<Plugin olsrd>
 +#     Host "127.0.0.1"
 +#     Port "2006"
 +#     CollectLinks "Summary"
 +#     CollectRoutes "Summary"
 +#     CollectTopology "Summary"
 +#</Plugin>
 +
  #<Plugin onewire>
  #     Device "-s localhost:4304"
  #     Sensor "F10FCA000800"
  #       Version 1
  #       Community "community_string"
  #       Collect "std_traffic"
 -#       Inverval 120
 +#       Interval 120
  #   </Host>
  #   <Host "some.server.mydomain.org">
  #       Address "192.168.0.42"
  #     IgnoreSelected false
  #</Plugin>
  
 +#<Plugin tokyotyrant>
 +#     Host "localhost"
 +#     Port "1978"
 +#</Plugin>
 +
  #<Plugin unixsock>
  #     SocketFile "@prefix@/var/run/@PACKAGE_NAME@-unixsock"
  #     SocketGroup "collectd"
diff --combined src/collectd.conf.pod
@@@ -409,106 -409,6 +409,106 @@@ By default no detailed zone informatio
  
  =back
  
 +=head2 Plugin C<couchdb>
 +
 +The couchdb plugin uses B<libcurl> (L<http://curl.haxx.se/>) and B<libyajl>
 +(L<http://www.lloydforge.org/projects/yajl/>) to collect values from CouchDB
 +documents (stored JSON notation).
 +
 +The following example will collect several values from the built-in `_stats'
 +runtime statistics module (L<http://wiki.apache.org/couchdb/Runtime_Statistics>).
 +
 +  <Plugin couchdb>
 +    <URL "http://localhost:5984/_stats">
 +      Instance "httpd"
 +      <Key "httpd/requests/count">
 +        Type "http_requests"
 +      </Key>
 +
 +      <Key "httpd_request_methods/*/count">
 +        Type "http_request_methods"
 +      </Key>
 +
 +      <Key "httpd_status_codes/*/count">
 +        Type "http_response_codes"
 +      </Key>
 +    </URL>
 +  </Plugin>
 +
 +The following example will collect the status values from each database:
 +
 +  <URL "http://localhost:5984/_all_dbs">
 +    Instance "dbs"
 +    <Key "*/doc_count">
 +      Type "gauge"
 +    </Key>
 +    <Key "*/doc_del_count">
 +      Type "counter"
 +    </Key>
 +    <Key "*/disk_size">
 +      Type "bytes"
 +    </Key>
 +  </URL>
 +
 +In the B<Plugin> block, there may be one or more B<URL> blocks, each defining
 +a URL to be fetched via HTTP (libcurl) and one or more B<Key> blocks.
 +The B<Key> string argument must be in a path format, of which is used to collect
 +a value from a JSON map object.  If a B<Key> path element is that of a I<*> wildcard,
 +the values for all keys will be collectd.
 +
 +The following options are valid within B<URL> blocks:
 +
 +=over 4
 +
 +=item B<Instance> I<Instance>
 +
 +Sets the plugin instance to I<Instance>.
 +
 +=item B<User> I<Name>
 +
 +Username to use if authorization is required to read the page.
 +
 +=item B<Password> I<Password>
 +
 +Password to use if authorization is required to read the page.
 +
 +=item B<VerifyPeer> B<true>|B<false>
 +
 +Enable or disable peer SSL certificate verification. See
 +L<http://curl.haxx.se/docs/sslcerts.html> for details. Enabled by default.
 +
 +=item B<VerifyHost> B<true>|B<false>
 +
 +Enable or disable peer host name verification. If enabled, the plugin checks if
 +the C<Common Name> or a C<Subject Alternate Name> field of the SSL certificate
 +matches the host name provided by the B<URL> option. If this identity check
 +fails, the connection is aborted. Obviously, only works when connecting to a
 +SSL enabled server. Enabled by default.
 +
 +=item B<CACert> I<file>
 +
 +File that holds one or more SSL certificates. If you want to use HTTPS you will
 +possibly need this option. What CA certificates come bundled with C<libcurl>
 +and are checked by default depends on the distribution you use.
 +
 +=back
 +
 +The following options are valid within B<Key> blocks:
 +
 +=over 4
 +
 +=item B<Type> I<Type>
 +
 +Sets the type used to dispatch the values to the daemon. Detailed information
 +about types and their configuration can be found in L<types.db(5)>. This
 +option is mandatory.
 +
 +=item B<Instance> I<Instance>
 +
 +Type-instance to use. Defaults to the current map key or current string array element value.
 +
 +=back
 +
  =head2 Plugin C<cpufreq>
  
  This plugin doesn't have any options. It reads
@@@ -870,12 -770,6 +870,12 @@@ match any one of the criteria are colle
  partitions are collected if a selection is made. If no selection is configured
  at all, B<all> partitions are selected.
  
 +=item B<ReportByDevice> I<true>|I<false>
 +
 +Report using the device name rather than the mountpoint. i.e. with this I<false>,
 +(the default), it will report a disk as "root", but with it I<true>, it will be
 +"sda1" (or whichever).
 +
  =back
  
  =head2 Plugin C<disk>
@@@ -927,10 -821,6 +927,10 @@@ may not work on certain platforms, suc
  
  Ignore packets that originate from this address.
  
 +=item B<SelectNumericQueryTypes> B<true>|B<false>
 +
 +Enabled by default, collects unknown (and thus presented as numeric only) query types.
 +
  =back
  
  =head2 Plugin C<email>
@@@ -2012,55 -1902,6 +2012,55 @@@ L<upsc(8)>
  
  =back
  
 +=head2 Plugin C<olsrd>
 +
 +The I<olsrd> plugin connects to the TCP port opened by the I<txtinfo> plugin of
 +the Optimized Link State Routing daemon and reads information about the current
 +state of the meshed network.
 +
 +The following configuration options are understood:
 +
 +=over 4
 +
 +=item B<Host> I<Host>
 +
 +Connect to I<Host>. Defaults to B<"localhost">.
 +
 +=item B<Port> I<Port>
 +
 +Specifies the port to connect to. This must be a string, even if you give the
 +port as a number rather than a service name. Defaults to B<"2006">.
 +
 +=item B<CollectLinks> B<No>|B<Summary>|B<Detail>
 +
 +Specifies what information to collect about links, i.E<nbsp>e. direct
 +connections of the daemon queried. If set to B<No>, no information is
 +collected. If set to B<Summary>, the number of links and the average of all
 +I<link quality> (LQ) and I<neighbor link quality> (NLQ) values is calculated.
 +If set to B<Detail> LQ and NLQ are collected per link.
 +
 +Defaults to B<Detail>.
 +
 +=item B<CollectRoutes> B<No>|B<Summary>|B<Detail>
 +
 +Specifies what information to collect about routes of the daemon queried. If
 +set to B<No>, no information is collected. If set to B<Summary>, the number of
 +routes and the average I<metric> and I<ETX> is calculated. If set to B<Detail>
 +metric and ETX are collected per route.
 +
 +Defaults to B<Summary>.
 +
 +=item B<CollectTopology> B<No>|B<Summary>|B<Detail>
 +
 +Specifies what information to collect about the global topology. If set to
 +B<No>, no information is collected. If set to B<Summary>, the number of links
 +in the entire topology and the average I<link quality> (LQ) is calculated.
 +If set to B<Detail> LQ and NLQ are collected for each link in the entire topology.
 +
 +Defaults to B<Summary>.
 +
 +=back
 +
  =head2 Plugin C<onewire>
  
  B<EXPERIMENTAL!> See notes below.
@@@ -2785,7 -2626,7 +2785,7 @@@ matching values will be ignored
  
  =head2 Plugin C<rrdcached>
  
- The C<rrdcached> plugin uses the RRDTool accelerator daemon, L<rrdcached(1)>,
+ The C<rrdcached> plugin uses the RRDtool accelerator daemon, L<rrdcached(1)>,
  to store values to RRD files in an efficient manner. The combination of the
  C<rrdcached> B<plugin> and the C<rrdcached> B<daemon> is very similar to the
  way the C<rrdtool> plugin works (see below). The added abstraction layer
@@@ -2837,7 -2678,7 +2837,7 @@@ expected. Default is B<true>
  
  You can use the settings B<StepSize>, B<HeartBeat>, B<RRARows>, and B<XFF> to
  fine-tune your RRD-files. Please read L<rrdcreate(1)> if you encounter problems
- using these settings. If you don't want to dive into the depths of RRDTool, you
+ using these settings. If you don't want to dive into the depths of RRDtool, you
  can safely ignore these settings.
  
  =over 4
@@@ -3322,26 -3163,6 +3322,26 @@@ selection is configured at all, B<all> 
  
  =back
  
 +=head2 Plugin C<tokyotyrant>
 +
 +The C<tokyotyrant plugin> connects to a TokyoTyrant server and collects a 
 +couple metrics: number of records, and database size on disk.
 +
 +=over 4
 +
 +=item B<Host> I<Hostname/IP>
 +
 +The hostname or ip which identifies the server.
 +Default: B<127.0.0.1>
 +
 +=item B<Port> I<Service/Port>
 +
 +The query port of the server. This needs to be a string, even if the port is
 +given in its numeric form.
 +Default: B<1978>
 +
 +=back
 +
  =head2 Plugin C<unixsock>
  
  =over 4
@@@ -3563,13 -3384,6 +3563,13 @@@ This applies to missing values, too: I
  missing value is generated once every B<Interval> seconds. If set to B<false>
  only one such notification is generated until the value appears again.
  
 +=item B<Percentage> B<true>|B<false>
 +
 +If set to B<true>, the minimum and maximum values given are interpreted as
 +percentage value, relative to the other data sources. This is helpful for
 +example for the "df" type, where you may want to issue a warning when less than
 +5E<nbsp>% of the total space is available. Defaults to B<false>.
 +
  =back
  
  =head1 FILTER CONFIGURATION
@@@ -4063,19 -3877,6 +4063,19 @@@ Example
     Satisfy "Any"
   </Match>
  
 +=item B<empty_counter>
 +
 +Matches all values with one or more data sources of type B<COUNTER> and where
 +all counter values are zero. These counters usually I<never> increased since
 +they started existing (and are therefore uninteresting), or got reset recently
 +or overflowed and you had really, I<really> bad luck.
 +
 +Please keep in mind that ignoring such counters can result in confusing
 +behavior: Counters which hardly ever increase will be zero for long periods of
 +time. If the counter is reset for some reason (machine or service restarted,
 +usually), the graph will be empty (NAN) for a long time. People may not
 +understand why.
 +
  =back
  
  =head2 Available targets
diff --combined src/java.c
@@@ -93,6 -93,8 +93,8 @@@ static cjni_callback_info_t *java_callb
  static size_t                java_callbacks_num  = 0;
  static pthread_mutex_t       java_callbacks_lock = PTHREAD_MUTEX_INITIALIZER;
  
+ static oconfig_item_t       *config_block = NULL;
  /*
   * Prototypes
   *
@@@ -284,10 -286,6 +286,10 @@@ static jobject ctoj_value_to_number (JN
      return (ctoj_jlong_to_number (jvm_env, (jlong) value.counter));
    else if (ds_type == DS_TYPE_GAUGE)
      return (ctoj_jdouble_to_number (jvm_env, (jdouble) value.gauge));
 +  if (ds_type == DS_TYPE_DERIVE)
 +    return (ctoj_jlong_to_number (jvm_env, (jlong) value.derive));
 +  if (ds_type == DS_TYPE_ABSOLUTE)
 +    return (ctoj_jlong_to_number (jvm_env, (jlong) value.absolute));
    else
      return (NULL);
  } /* }}} jobject ctoj_value_to_number */
@@@ -1046,39 -1044,33 +1048,39 @@@ static int jtoc_value (JNIEnv *jvm_env
  
    class_ptr = (*jvm_env)->GetObjectClass (jvm_env, object_ptr);
  
 -  if (ds_type == DS_TYPE_COUNTER)
 +  if (ds_type == DS_TYPE_GAUGE)
    {
 -    jlong tmp_long;
 +    jdouble tmp_double;
  
 -    status = jtoc_long (jvm_env, &tmp_long,
 -        class_ptr, object_ptr, "longValue");
 +    status = jtoc_double (jvm_env, &tmp_double,
 +        class_ptr, object_ptr, "doubleValue");
      if (status != 0)
      {
        ERROR ("java plugin: jtoc_value: "
 -          "jtoc_long failed.");
 +          "jtoc_double failed.");
        return (-1);
      }
 -    (*ret_value).counter = (counter_t) tmp_long;
 +    (*ret_value).gauge = (gauge_t) tmp_double;
    }
    else
    {
 -    jdouble tmp_double;
 +    jlong tmp_long;
  
 -    status = jtoc_double (jvm_env, &tmp_double,
 -        class_ptr, object_ptr, "doubleValue");
 +    status = jtoc_long (jvm_env, &tmp_long,
 +        class_ptr, object_ptr, "longValue");
      if (status != 0)
      {
        ERROR ("java plugin: jtoc_value: "
 -          "jtoc_double failed.");
 +          "jtoc_long failed.");
        return (-1);
      }
 -    (*ret_value).gauge = (gauge_t) tmp_double;
 +
 +    if (ds_type == DS_TYPE_DERIVE)
 +      (*ret_value).derive = (derive_t) tmp_long;
 +    else if (ds_type == DS_TYPE_ABSOLUTE)
 +      (*ret_value).absolute = (absolute_t) tmp_long;
 +    else
 +      (*ret_value).counter = (counter_t) tmp_long;
    }
  
    return (0);
@@@ -2314,7 -2306,7 +2316,7 @@@ static int cjni_config_plugin_block (oc
    return (0);
  } /* }}} int cjni_config_plugin_block */
  
- static int cjni_config (oconfig_item_t *ci) /* {{{ */
+ static int cjni_config_perform (oconfig_item_t *ci) /* {{{ */
  {
    int success;
    int errors;
    }
  
    return (0);
- } /* }}} int cjni_config */
+ } /* }}} int cjni_config_perform */
+ /* Copy the children of `ci' to the global `config_block' variable. */
+ static int cjni_config_callback (oconfig_item_t *ci) /* {{{ */
+ {
+   oconfig_item_t *ci_copy;
+   oconfig_item_t *tmp;
+   assert (ci != NULL);
+   if (ci->children_num == 0)
+     return (0); /* nothing to do */
+   ci_copy = oconfig_clone (ci);
+   if (ci_copy == NULL)
+   {
+     ERROR ("java plugin: oconfig_clone failed.");
+     return (-1);
+   }
+   if (config_block == NULL)
+   {
+     config_block = ci_copy;
+     return (0);
+   }
+   tmp = realloc (config_block->children,
+       (config_block->children_num + ci_copy->children_num) * sizeof (*tmp));
+   if (tmp == NULL)
+   {
+     ERROR ("java plugin: realloc failed.");
+     oconfig_free (ci_copy);
+     return (-1);
+   }
+   config_block->children = tmp;
+   /* Copy the pointers */
+   memcpy (config_block->children + config_block->children_num,
+       ci_copy->children,
+       ci_copy->children_num * sizeof (*ci_copy->children));
+   /* Delete the pointers from the copy, so `oconfig_free' can't free them. */
+   memset (ci_copy->children, 0,
+       ci_copy->children_num * sizeof (*ci_copy->children));
+   ci_copy->children_num = 0;
+   oconfig_free (ci_copy);
+   return (0);
+ } /* }}} int cjni_config_callback */
  
  /* Free the data contained in the `user_data_t' pointer passed to `cjni_read'
   * and `cjni_write'. In particular, delete the global reference to the Java
@@@ -3005,6 -3045,22 +3055,22 @@@ static int cjni_init (void) /* {{{ *
  {
    JNIEnv *jvm_env;
  
+   if ((config_block == NULL) && (jvm == NULL))
+   {
+     ERROR ("java plugin: cjni_init: No configuration block for "
+         "the java plugin was found.");
+     return (-1);
+   }
+   if (config_block != NULL)
+   {
+     int status;
+     status = cjni_config_perform (config_block);
+     oconfig_free (config_block);
+     config_block = NULL;
+   }
    if (jvm == NULL)
    {
      ERROR ("java plugin: cjni_init: jvm == NULL");
  
  void module_register (void)
  {
-   plugin_register_complex_config ("java", cjni_config);
+   plugin_register_complex_config ("java", cjni_config_callback);
    plugin_register_init ("java", cjni_init);
    plugin_register_shutdown ("java", cjni_shutdown);
  } /* void module_register (void) */
diff --combined src/meta_data.c
@@@ -1,6 -1,6 +1,6 @@@
  /**
   * collectd - src/meta_data.c
 - * Copyright (C) 2008  Florian octo Forster
 + * Copyright (C) 2008,2009  Florian octo Forster
   *
   * This program is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License as published by the
@@@ -32,7 -32,6 +32,7 @@@
  #define MD_TYPE_SIGNED_INT   2
  #define MD_TYPE_UNSIGNED_INT 3
  #define MD_TYPE_DOUBLE       4
 +#define MD_TYPE_BOOLEAN      5
  
  /*
   * Data types
@@@ -43,7 -42,6 +43,7 @@@ union meta_value_
    int64_t  mv_signed_int;
    uint64_t mv_unsigned_int;
    double   mv_double;
 +  _Bool    mv_boolean;
  };
  typedef union meta_value_u meta_value_t;
  
@@@ -289,9 -287,6 +289,9 @@@ int meta_data_delete (meta_data_t *md, 
    return (0);
  } /* }}} int meta_data_delete */
  
 +/*
 + * Add functions
 + */
  int meta_data_add_string (meta_data_t *md, /* {{{ */
      const char *key, const char *value)
  {
@@@ -370,27 -365,6 +370,27 @@@ int meta_data_add_double (meta_data_t *
    return (md_entry_insert (md, e));
  } /* }}} int meta_data_add_double */
  
 +int meta_data_add_boolean (meta_data_t *md, /* {{{ */
 +    const char *key, _Bool value)
 +{
 +  meta_entry_t *e;
 +
 +  if ((md == NULL) || (key == NULL))
 +    return (-EINVAL);
 +
 +  e = md_entry_alloc (key);
 +  if (e == NULL)
 +    return (-ENOMEM);
 +
 +  e->value.mv_boolean = value;
 +  e->type = MD_TYPE_BOOLEAN;
 +
 +  return (md_entry_insert (md, e));
 +} /* }}} int meta_data_add_boolean */
 +
 +/*
 + * Get functions
 + */
  int meta_data_get_string (meta_data_t *md, /* {{{ */
      const char *key, char **value)
  {
      return (-ENOENT);
    }
  
-   if (e->type != MD_TYPE_SIGNED_INT)
+   if (e->type != MD_TYPE_STRING)
    {
      ERROR ("meta_data_get_signed_int: Type mismatch for key `%s'", e->key);
      pthread_mutex_unlock (&md->lock);
@@@ -521,34 -495,4 +521,34 @@@ int meta_data_get_double (meta_data_t *
    return (0);
  } /* }}} int meta_data_get_double */
  
 +int meta_data_get_boolean (meta_data_t *md, /* {{{ */
 +    const char *key, _Bool *value)
 +{
 +  meta_entry_t *e;
 +
 +  if ((md == NULL) || (key == NULL) || (value == NULL))
 +    return (-EINVAL);
 +
 +  pthread_mutex_lock (&md->lock);
 +
 +  e = md_entry_lookup (md, key);
 +  if (e == NULL)
 +  {
 +    pthread_mutex_unlock (&md->lock);
 +    return (-ENOENT);
 +  }
 +
 +  if (e->type != MD_TYPE_BOOLEAN)
 +  {
 +    ERROR ("meta_data_get_boolean: Type mismatch for key `%s'", e->key);
 +    pthread_mutex_unlock (&md->lock);
 +    return (-ENOENT);
 +  }
 +
 +  *value = e->value.mv_boolean;
 +
 +  pthread_mutex_unlock (&md->lock);
 +  return (0);
 +} /* }}} int meta_data_get_boolean */
 +
  /* vim: set sw=2 sts=2 et fdm=marker : */
diff --combined src/plugin.c
@@@ -51,6 -51,7 +51,7 @@@ typedef struct callback_func_s callback
  
  #define RF_SIMPLE  0
  #define RF_COMPLEX 1
+ #define RF_REMOVE  65535
  struct read_func_s
  {
        /* `read_func_t' "inherits" from `callback_func_t'.
@@@ -84,6 -85,7 +85,7 @@@ static c_avl_tree_t *data_sets
  static char *plugindir = NULL;
  
  static c_heap_t       *read_heap = NULL;
+ static llist_t        *read_list;
  static int             read_loop = 1;
  static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
  static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
@@@ -307,6 -309,7 +309,7 @@@ static void *plugin_read_thread (void _
                read_func_t *rf;
                struct timeval now;
                int status;
+               int rf_type;
  
                /* Get the read function that needs to be read next. */
                rf = c_head_get_root (read_heap);
                pthread_mutex_lock (&read_lock);
                pthread_cond_timedwait (&read_cond, &read_lock,
                                &rf->rf_next_read);
+               /* Must hold `real_lock' when accessing `rf->rf_type'. */
+               rf_type = rf->rf_type;
                pthread_mutex_unlock (&read_lock);
  
                /* Check if we're supposed to stop.. This may have interrupted
                        break;
                }
  
+               /* The entry has been marked for deletion. The linked list
+                * entry has already been removed by `plugin_unregister_read'.
+                * All we have to do here is free the `read_func_t' and
+                * continue. */
+               if (rf_type == RF_REMOVE)
+               {
+                       DEBUG ("plugin_read_thread: Destroying the `%s' "
+                                       "callback.", rf->rf_name);
+                       destroy_callback ((callback_func_t *) rf);
+                       rf = NULL;
+                       continue;
+               }
                DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
  
-               if (rf->rf_type == RF_SIMPLE)
+               if (rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
  
                {
                        plugin_read_cb callback;
  
+                       assert (rf_type == RF_COMPLEX);
                        callback = rf->rf_callback;
                        status = (*callback) (&rf->rf_udata);
                }
@@@ -641,22 -661,67 +661,67 @@@ static int plugin_compare_read_func (co
                return (0);
  } /* int plugin_compare_read_func */
  
- int plugin_register_read (const char *name,
-               int (*callback) (void))
+ /* Add a read function to both, the heap and a linked list. The linked list if
+  * used to look-up read functions, especially for the remove function. The heap
+  * is used to determine which plugin to read next. */
+ static int plugin_insert_read (read_func_t *rf)
  {
-       read_func_t *rf;
+       int status;
+       llentry_t *le;
+       pthread_mutex_lock (&read_lock);
+       if (read_list == NULL)
+       {
+               read_list = llist_create ();
+               if (read_list == NULL)
+               {
+                       pthread_mutex_unlock (&read_lock);
+                       ERROR ("plugin_insert_read: read_list failed.");
+                       return (-1);
+               }
+       }
  
        if (read_heap == NULL)
        {
                read_heap = c_heap_create (plugin_compare_read_func);
                if (read_heap == NULL)
                {
-                       ERROR ("plugin_register_read: "
-                                       "c_heap_create failed.");
+                       pthread_mutex_unlock (&read_lock);
+                       ERROR ("plugin_insert_read: c_heap_create failed.");
                        return (-1);
                }
        }
  
+       le = llentry_create (rf->rf_name, rf);
+       if (le == NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               ERROR ("plugin_insert_read: llentry_create failed.");
+               return (-1);
+       }
+       status = c_heap_insert (read_heap, rf);
+       if (status != 0)
+       {
+               pthread_mutex_unlock (&read_lock);
+               ERROR ("plugin_insert_read: c_heap_insert failed.");
+               llentry_destroy (le);
+               return (-1);
+       }
+       /* This does not fail. */
+       llist_append (read_list, le);
+       pthread_mutex_unlock (&read_lock);
+       return (0);
+ } /* int plugin_insert_read */
+ int plugin_register_read (const char *name,
+               int (*callback) (void))
+ {
+       read_func_t *rf;
        rf = (read_func_t *) malloc (sizeof (read_func_t));
        if (rf == NULL)
        {
        rf->rf_interval.tv_nsec = 0;
        rf->rf_effective_interval = rf->rf_interval;
  
-       return (c_heap_insert (read_heap, rf));
+       return (plugin_insert_read (rf));
  } /* int plugin_register_read */
  
  int plugin_register_complex_read (const char *name,
  {
        read_func_t *rf;
  
-       if (read_heap == NULL)
-       {
-               read_heap = c_heap_create (plugin_compare_read_func);
-               if (read_heap == NULL)
-               {
-                       ERROR ("plugin_register_read: c_heap_create failed.");
-                       return (-1);
-               }
-       }
        rf = (read_func_t *) malloc (sizeof (read_func_t));
        if (rf == NULL)
        {
                rf->rf_udata = *user_data;
        }
  
-       return (c_heap_insert (read_heap, rf));
+       return (plugin_insert_read (rf));
  } /* int plugin_register_complex_read */
  
  int plugin_register_write (const char *name,
@@@ -816,12 -871,37 +871,37 @@@ int plugin_unregister_init (const char 
        return (plugin_unregister (list_init, name));
  }
  
- int plugin_unregister_read (const char *name)
+ int plugin_unregister_read (const char *name) /* {{{ */
  {
-       /* TODO: Implement removal of a specific key from the heap. */
-       assert (0);
-       return (-1);
- }
+       llentry_t *le;
+       read_func_t *rf;
+       if (name == NULL)
+               return (-ENOENT);
+       pthread_mutex_lock (&read_lock);
+       if (read_list == NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               return (-ENOENT);
+       }
+       le = llist_search (read_list, name);
+       llist_remove (read_list, le);
+       rf = le->value;
+       assert (rf != NULL);
+       rf->rf_type = RF_REMOVE;
+       pthread_mutex_unlock (&read_lock);
+       llentry_destroy (le);
+       DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
+       return (0);
+ } /* }}} int plugin_unregister_read */
  
  int plugin_unregister_write (const char *name)
  {
@@@ -1093,6 -1173,12 +1173,12 @@@ void plugin_shutdown_all (void
        stop_read_threads ();
  
        destroy_all_callbacks (&list_init);
+       pthread_mutex_lock (&read_lock);
+       llist_destroy (read_list);
+       read_list = NULL;
+       pthread_mutex_unlock (&read_lock);
        destroy_read_heap ();
  
        plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
@@@ -1136,8 -1222,6 +1222,8 @@@ int plugin_dispatch_values (value_list_
  
        data_set_t *ds;
  
 +      int free_meta_data = 0;
 +
        if ((vl == NULL) || (vl->type[0] == 0)
                        || (vl->values == NULL) || (vl->values_len < 1))
        {
                return (-1);
        }
  
 +      /* Free meta data only if the calling function didn't specify any. In
 +       * this case matches and targets may add some and the calling function
 +       * may not expect (and therefore free) that data. */
 +      if (vl->meta == NULL)
 +              free_meta_data = 1;
 +
        if (list_write == NULL)
                c_complain_once (LOG_WARNING, &no_write_complaint,
                                "plugin_dispatch_values: No write callback has been "
                vl->values_len = saved_values_len;
        }
  
 +      if ((free_meta_data != 0) && (vl->meta != NULL))
 +      {
 +              meta_data_destroy (vl->meta);
 +              vl->meta = NULL;
 +      }
 +
        return (0);
  } /* int plugin_dispatch_values */
  
diff --combined src/utils_cache.c
@@@ -25,7 -25,6 +25,7 @@@
  #include "utils_avltree.h"
  #include "utils_cache.h"
  #include "utils_threshold.h"
 +#include "meta_data.h"
  
  #include <assert.h>
  #include <pthread.h>
@@@ -35,7 -34,7 +35,7 @@@ typedef struct cache_entry_
        char name[6 * DATA_MAX_NAME_LEN];
        int        values_num;
        gauge_t   *values_gauge;
 -      counter_t *values_counter;
 +      value_t   *values_raw;
        /* Time contained in the package
         * (for calculating rates) */
        time_t last_time;
         * (for purding old entries) */
        int interval;
        int state;
 +
 +      /*
 +       * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
 +       * !  0  !  1  !  2  !  3  !  4  !  5  !  6  !  7  !  8  ! ...
 +       * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
 +       * ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ds0 ! ds1 ! ds2 ! ...
 +       * +-----+-----+-----+-----+-----+-----+-----+-----+-----+----
 +       * !      t = 0      !      t = 1      !      t = 2      ! ...
 +       * +-----------------+-----------------+-----------------+----
 +       */
 +      gauge_t *history;
 +      size_t   history_index; /* points to the next position to write to. */
 +      size_t   history_length;
 +
 +      meta_data_t *meta;
  } cache_entry_t;
  
  static c_avl_tree_t   *cache_tree = NULL;
@@@ -85,21 -69,17 +85,21 @@@ static cache_entry_t *cache_alloc (int 
    memset (ce, '\0', sizeof (cache_entry_t));
    ce->values_num = values_num;
  
 -  ce->values_gauge = (gauge_t *) calloc (values_num, sizeof (gauge_t));
 -  ce->values_counter = (counter_t *) calloc (values_num, sizeof (counter_t));
 -  if ((ce->values_gauge == NULL) || (ce->values_counter == NULL))
 +  ce->values_gauge = calloc (values_num, sizeof (*ce->values_gauge));
 +  ce->values_raw   = calloc (values_num, sizeof (*ce->values_raw));
 +  if ((ce->values_gauge == NULL) || (ce->values_raw == NULL))
    {
      sfree (ce->values_gauge);
 -    sfree (ce->values_counter);
 +    sfree (ce->values_raw);
      sfree (ce);
      ERROR ("utils_cache: cache_alloc: calloc failed.");
      return (NULL);
    }
  
 +  ce->history = NULL;
 +  ce->history_length = 0;
 +  ce->meta = NULL;
 +
    return (ce);
  } /* cache_entry_t *cache_alloc */
  
@@@ -109,13 -89,7 +109,13 @@@ static void cache_free (cache_entry_t *
      return;
  
    sfree (ce->values_gauge);
 -  sfree (ce->values_counter);
 +  sfree (ce->values_raw);
 +  sfree (ce->history);
 +  if (ce->meta != NULL)
 +  {
 +    meta_data_destroy (ce->meta);
 +    ce->meta = NULL;
 +  }
    sfree (ce);
  } /* void cache_free */
  
@@@ -193,21 -167,6 +193,21 @@@ static int uc_send_notification (const 
    return (0);
  } /* int uc_send_notification */
  
 +static void uc_check_range (const data_set_t *ds, cache_entry_t *ce)
 +{
 +  int i;
 +
 +  for (i = 0; i < ds->ds_num; i++)
 +  {
 +    if (isnan (ce->values_gauge[i]))
 +      continue;
 +    else if (ce->values_gauge[i] < ds->ds[i].min)
 +      ce->values_gauge[i] = NAN;
 +    else if (ce->values_gauge[i] > ds->ds[i].max)
 +      ce->values_gauge[i] = NAN;
 +  }
 +} /* void uc_check_range */
 +
  static int uc_insert (const data_set_t *ds, const value_list_t *vl,
      const char *key)
  {
  
    for (i = 0; i < ds->ds_num; i++)
    {
 -    if (ds->ds[i].type == DS_TYPE_COUNTER)
 -    {
 -      ce->values_gauge[i] = NAN;
 -      ce->values_counter[i] = vl->values[i].counter;
 -    }
 -    else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
 +    switch (ds->ds[i].type)
      {
 -      ce->values_gauge[i] = vl->values[i].gauge;
 -    }
 +      case DS_TYPE_COUNTER:
 +      ce->values_gauge[i] = NAN;
 +      ce->values_raw[i].counter = vl->values[i].counter;
 +      break;
 +
 +      case DS_TYPE_GAUGE:
 +      ce->values_gauge[i] = vl->values[i].gauge;
 +      ce->values_raw[i].gauge = vl->values[i].gauge;
 +      break;
 +
 +      case DS_TYPE_DERIVE:
 +      ce->values_gauge[i] = NAN;
 +      ce->values_raw[i].derive = vl->values[i].derive;
 +      break;
 +
 +      case DS_TYPE_ABSOLUTE:
 +      ce->values_gauge[i] = NAN;
 +      if (vl->interval > 0)
 +        ce->values_gauge[i] = ((double) vl->values[i].absolute)
 +          / ((double) vl->interval);
 +      ce->values_raw[i].absolute = vl->values[i].absolute;
 +      break;
 +      
 +      default:
 +      /* This shouldn't happen. */
 +      ERROR ("uc_insert: Don't know how to handle data source type %i.",
 +          ds->ds[i].type);
 +      return (-1);
 +    } /* switch (ds->ds[i].type) */
    } /* for (i) */
  
 +  /* Prune invalid gauge data */
 +  uc_check_range (ds, ce);
 +
    ce->last_time = vl->time;
    ce->last_update = time (NULL);
    ce->interval = vl->interval;
@@@ -362,6 -296,7 +362,7 @@@ int uc_check_timeout (void
      {
        DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
          keys[i]);
+       ce = NULL;
        status = c_avl_remove (cache_tree, keys[i],
          (void *) &key, (void *) &ce);
        if (status != 0)
        }
        sfree (keys[i]);
        sfree (key);
-       cache_free (ce);
+       if (ce != NULL)
+         cache_free (ce);
        continue;
      }
  
@@@ -450,7 -386,7 +452,7 @@@ int uc_update (const data_set_t *ds, co
    int status;
    int i;
  
 -  if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
 +  if (FORMAT_VL (name, sizeof (name), vl) != 0)
    {
      ERROR ("uc_update: FORMAT_VL failed.");
      return (-1);
  
    for (i = 0; i < ds->ds_num; i++)
    {
 -    if (ds->ds[i].type == DS_TYPE_COUNTER)
 +    switch (ds->ds[i].type)
      {
 -      counter_t diff;
 +      case DS_TYPE_COUNTER:
 +      {
 +        counter_t diff;
 +
 +        /* check if the counter has wrapped around */
 +        if (vl->values[i].counter < ce->values_raw[i].counter)
 +        {
 +          if (ce->values_raw[i].counter <= 4294967295U)
 +            diff = (4294967295U - ce->values_raw[i].counter)
 +              + vl->values[i].counter;
 +          else
 +            diff = (18446744073709551615ULL - ce->values_raw[i].counter)
 +              + vl->values[i].counter;
 +        }
 +        else /* counter has NOT wrapped around */
 +        {
 +          diff = vl->values[i].counter - ce->values_raw[i].counter;
 +        }
 +
 +        ce->values_gauge[i] = ((double) diff)
 +          / ((double) (vl->time - ce->last_time));
 +        ce->values_raw[i].counter = vl->values[i].counter;
 +      }
 +      break;
  
 -      /* check if the counter has wrapped around */
 -      if (vl->values[i].counter < ce->values_counter[i])
 -      {
 -      if (ce->values_counter[i] <= 4294967295U)
 -        diff = (4294967295U - ce->values_counter[i])
 -          + vl->values[i].counter;
 -      else
 -        diff = (18446744073709551615ULL - ce->values_counter[i])
 -          + vl->values[i].counter;
 -      }
 -      else /* counter has NOT wrapped around */
 -      {
 -      diff = vl->values[i].counter - ce->values_counter[i];
 -      }
 +      case DS_TYPE_GAUGE:
 +      ce->values_raw[i].gauge = vl->values[i].gauge;
 +      ce->values_gauge[i] = vl->values[i].gauge;
 +      break;
 +
 +      case DS_TYPE_DERIVE:
 +      {
 +        derive_t diff;
 +
 +        diff = vl->values[i].derive - ce->values_raw[i].derive;
 +
 +        ce->values_gauge[i] = ((double) diff)
 +          / ((double) (vl->time - ce->last_time));
 +        ce->values_raw[i].derive = vl->values[i].derive;
 +      }
 +      break;
 +
 +      case DS_TYPE_ABSOLUTE:
 +      ce->values_gauge[i] = ((double) vl->values[i].absolute)
 +        / ((double) (vl->time - ce->last_time));
 +      ce->values_raw[i].absolute = vl->values[i].absolute;
 +      break;
 +
 +      default:
 +      /* This shouldn't happen. */
 +      pthread_mutex_unlock (&cache_lock);
 +      ERROR ("uc_update: Don't know how to handle data source type %i.",
 +          ds->ds[i].type);
 +      return (-1);
 +    } /* switch (ds->ds[i].type) */
  
 -      ce->values_gauge[i] = ((double) diff)
 -      / ((double) (vl->time - ce->last_time));
 -      ce->values_counter[i] = vl->values[i].counter;
 -    }
 -    else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
 -    {
 -      ce->values_gauge[i] = vl->values[i].gauge;
 -    }
      DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
    } /* for (i) */
  
 +  /* Update the history if it exists. */
 +  if (ce->history != NULL)
 +  {
 +    assert (ce->history_index < ce->history_length);
 +    for (i = 0; i < ce->values_num; i++)
 +    {
 +      size_t hist_idx = (ce->values_num * ce->history_index) + i;
 +      ce->history[hist_idx] = ce->values_gauge[i];
 +    }
 +
 +    assert (ce->history_length > 0);
 +    ce->history_index = (ce->history_index + 1) % ce->history_length;
 +  }
 +
 +  /* Prune invalid gauge data */
 +  uc_check_range (ds, ce);
 +
    ce->last_time = vl->time;
    ce->last_update = time (NULL);
    ce->interval = vl->interval;
@@@ -611,16 -499,24 +613,24 @@@ int uc_get_rate_by_name (const char *na
    {
      assert (ce != NULL);
  
-     ret_num = ce->values_num;
-     ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
-     if (ret == NULL)
+     /* remove missing values from getval */
+     if (ce->state == STATE_MISSING)
      {
-       ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
        status = -1;
      }
      else
      {
-       memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
+       ret_num = ce->values_num;
+       ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
+       if (ret == NULL)
+       {
+         ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
+         status = -1;
+       }
+       else
+       {
+         memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
+       }
      }
    }
    else
@@@ -647,7 -543,7 +657,7 @@@ gauge_t *uc_get_rate (const data_set_t 
    size_t ret_num = 0;
    int status;
  
 -  if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
 +  if (FORMAT_VL (name, sizeof (name), vl) != 0)
    {
      ERROR ("utils_cache: uc_get_rate: FORMAT_VL failed.");
      return (NULL);
@@@ -693,6 -589,10 +703,10 @@@ int uc_get_names (char ***ret_names, ti
    {
      char **temp;
  
+     /* remove missing values when list values */
+     if (value->state == STATE_MISSING)
+       continue;
      if (ret_times != NULL)
      {
        time_t *tmp_times;
@@@ -753,7 -653,7 +767,7 @@@ int uc_get_state (const data_set_t *ds
    cache_entry_t *ce = NULL;
    int ret = STATE_ERROR;
  
 -  if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
 +  if (FORMAT_VL (name, sizeof (name), vl) != 0)
    {
      ERROR ("uc_get_state: FORMAT_VL failed.");
      return (STATE_ERROR);
@@@ -778,7 -678,7 +792,7 @@@ int uc_set_state (const data_set_t *ds
    cache_entry_t *ce = NULL;
    int ret = -1;
  
 -  if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
 +  if (FORMAT_VL (name, sizeof (name), vl) != 0)
    {
      ERROR ("uc_get_state: FORMAT_VL failed.");
      return (STATE_ERROR);
  
    return (ret);
  } /* int uc_set_state */
 +
 +int uc_get_history_by_name (const char *name,
 +    gauge_t *ret_history, size_t num_steps, size_t num_ds)
 +{
 +  cache_entry_t *ce = NULL;
 +  size_t i;
 +  int status = 0;
 +
 +  pthread_mutex_lock (&cache_lock);
 +
 +  status = c_avl_get (cache_tree, name, (void *) &ce);
 +  if (status != 0)
 +  {
 +    pthread_mutex_unlock (&cache_lock);
 +    return (-ENOENT);
 +  }
 +
 +  if (((size_t) ce->values_num) != num_ds)
 +  {
 +    pthread_mutex_unlock (&cache_lock);
 +    return (-EINVAL);
 +  }
 +
 +  /* Check if there are enough values available. If not, increase the buffer
 +   * size. */
 +  if (ce->history_length < num_steps)
 +  {
 +    gauge_t *tmp;
 +    size_t i;
 +
 +    tmp = realloc (ce->history, sizeof (*ce->history)
 +      * num_steps * ce->values_num);
 +    if (tmp == NULL)
 +    {
 +      pthread_mutex_unlock (&cache_lock);
 +      return (-ENOMEM);
 +    }
 +
 +    for (i = ce->history_length * ce->values_num;
 +      i < (num_steps * ce->values_num);
 +      i++)
 +      tmp[i] = NAN;
 +
 +    ce->history = tmp;
 +    ce->history_length = num_steps;
 +  } /* if (ce->history_length < num_steps) */
 +
 +  /* Copy the values to the output buffer. */
 +  for (i = 0; i < num_steps; i++)
 +  {
 +    size_t src_index;
 +    size_t dst_index;
 +
 +    if (i < ce->history_index)
 +      src_index = ce->history_index - (i + 1);
 +    else
 +      src_index = ce->history_length + ce->history_index - (i + 1);
 +    src_index = src_index * num_ds;
 +
 +    dst_index = i * num_ds;
 +
 +    memcpy (ret_history + dst_index, ce->history + src_index,
 +      sizeof (*ret_history) * num_ds);
 +  }
 +
 +  pthread_mutex_unlock (&cache_lock);
 +
 +  return (0);
 +} /* int uc_get_history_by_name */
 +
 +int uc_get_history (const data_set_t *ds, const value_list_t *vl,
 +    gauge_t *ret_history, size_t num_steps, size_t num_ds)
 +{
 +  char name[6 * DATA_MAX_NAME_LEN];
 +
 +  if (FORMAT_VL (name, sizeof (name), vl) != 0)
 +  {
 +    ERROR ("utils_cache: uc_get_history: FORMAT_VL failed.");
 +    return (-1);
 +  }
 +
 +  return (uc_get_history_by_name (name, ret_history, num_steps, num_ds));
 +} /* int uc_get_history */
 +
 +/*
 + * Meta data interface
 + */
 +/* XXX: This function will acquire `cache_lock' but will not free it! */
 +static meta_data_t *uc_get_meta (const value_list_t *vl) /* {{{ */
 +{
 +  char name[6 * DATA_MAX_NAME_LEN];
 +  cache_entry_t *ce = NULL;
 +  int status;
 +
 +  status = FORMAT_VL (name, sizeof (name), vl);
 +  if (status != 0)
 +  {
 +    ERROR ("utils_cache: uc_get_meta: FORMAT_VL failed.");
 +    return (NULL);
 +  }
 +
 +  pthread_mutex_lock (&cache_lock);
 +
 +  status = c_avl_get (cache_tree, name, (void *) &ce);
 +  if (status != 0)
 +  {
 +    pthread_mutex_unlock (&cache_lock);
 +    return (NULL);
 +  }
 +  assert (ce != NULL);
 +
 +  if (ce->meta == NULL)
 +    ce->meta = meta_data_create ();
 +
 +  return (ce->meta);
 +} /* }}} meta_data_t *uc_get_meta */
 +
 +/* Sorry about this preprocessor magic, but it really makes this file much
 + * shorter.. */
 +#define UC_WRAP(wrap_function) { \
 +  meta_data_t *meta; \
 +  int status; \
 +  meta = uc_get_meta (vl); \
 +  if (meta == NULL) return (-1); \
 +  status = wrap_function (meta, key); \
 +  pthread_mutex_unlock (&cache_lock); \
 +  return (status); \
 +}
 +int uc_meta_data_exists (const value_list_t *vl, const char *key)
 +  UC_WRAP (meta_data_exists)
 +
 +int uc_meta_data_delete (const value_list_t *vl, const char *key)
 +  UC_WRAP (meta_data_delete)
 +#undef UC_WRAP
 +
 +/* We need a new version of this macro because the following functions take
 + * two argumetns. */
 +#define UC_WRAP(wrap_function) { \
 +  meta_data_t *meta; \
 +  int status; \
 +  meta = uc_get_meta (vl); \
 +  if (meta == NULL) return (-1); \
 +  status = wrap_function (meta, key, value); \
 +  pthread_mutex_unlock (&cache_lock); \
 +  return (status); \
 +}
 +int uc_meta_data_add_string (const value_list_t *vl,
 +    const char *key,
 +    const char *value)
 +  UC_WRAP(meta_data_add_string)
 +int uc_meta_data_add_signed_int (const value_list_t *vl,
 +    const char *key,
 +    int64_t value)
 +  UC_WRAP(meta_data_add_signed_int)
 +int uc_meta_data_add_unsigned_int (const value_list_t *vl,
 +    const char *key,
 +    uint64_t value)
 +  UC_WRAP(meta_data_add_unsigned_int)
 +int uc_meta_data_add_double (const value_list_t *vl,
 +    const char *key,
 +    double value)
 +  UC_WRAP(meta_data_add_double)
 +int uc_meta_data_add_boolean (const value_list_t *vl,
 +    const char *key,
 +    _Bool value)
 +  UC_WRAP(meta_data_add_boolean)
 +
 +int uc_meta_data_get_string (const value_list_t *vl,
 +    const char *key,
 +    char **value)
 +  UC_WRAP(meta_data_get_string)
 +int uc_meta_data_get_signed_int (const value_list_t *vl,
 +    const char *key,
 +    int64_t *value)
 +  UC_WRAP(meta_data_get_signed_int)
 +int uc_meta_data_get_unsigned_int (const value_list_t *vl,
 +    const char *key,
 +    uint64_t *value)
 +  UC_WRAP(meta_data_get_unsigned_int)
 +int uc_meta_data_get_double (const value_list_t *vl,
 +    const char *key,
 +    double *value)
 +  UC_WRAP(meta_data_get_double)
 +int uc_meta_data_get_boolean (const value_list_t *vl,
 +    const char *key,
 +    _Bool *value)
 +  UC_WRAP(meta_data_get_boolean)
 +#undef UC_WRAP
 +
  /* vim: set sw=2 ts=8 sts=2 tw=78 : */