Merge branch 'collectd-5.1' into collectd-5.2
authorFlorian Forster <octo@collectd.org>
Sat, 16 Mar 2013 12:43:26 +0000 (13:43 +0100)
committerFlorian Forster <octo@collectd.org>
Sat, 16 Mar 2013 12:43:26 +0000 (13:43 +0100)
1  2 
src/exec.c
src/plugin.c

diff --combined src/exec.c
@@@ -270,15 -270,14 +270,15 @@@ static void set_environment (void) /* {
    char buffer[1024];
  
  #ifdef HAVE_SETENV
 -  ssnprintf (buffer, sizeof (buffer), "%.3f", CDTIME_T_TO_DOUBLE (interval_g));
 +  ssnprintf (buffer, sizeof (buffer), "%.3f",
 +      CDTIME_T_TO_DOUBLE (plugin_get_interval ()));
    setenv ("COLLECTD_INTERVAL", buffer, /* overwrite = */ 1);
  
    ssnprintf (buffer, sizeof (buffer), "%s", hostname_g);
    setenv ("COLLECTD_HOSTNAME", buffer, /* overwrite = */ 1);
  #else
    ssnprintf (buffer, sizeof (buffer), "COLLECTD_INTERVAL=%.3f",
 -      CDTIME_T_TO_DOUBLE (interval_g));
 +      CDTIME_T_TO_DOUBLE (plugin_get_interval ()));
    putenv (buffer);
  
    ssnprintf (buffer, sizeof (buffer), "COLLECTD_HOSTNAME=%s", hostname_g);
@@@ -493,7 -492,7 +493,7 @@@ static int fork_child (program_list_t *
        close (fd_pipe_out[1]);
      }
  
-     /* Now connect the `out' pipe to STDOUT */
+     /* Now connect the `err' pipe to STDERR */
      if (fd_pipe_err[1] != STDERR_FILENO)
      {
        dup2 (fd_pipe_err[1], STDERR_FILENO);
@@@ -827,7 -826,7 +827,7 @@@ static int exec_read (void) /* {{{ *
  
      pthread_attr_init (&attr);
      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
 -    pthread_create (&t, &attr, exec_read_one, (void *) pl);
 +    plugin_thread_create (&t, &attr, exec_read_one, (void *) pl);
      pthread_attr_destroy (&attr);
    } /* for (pl) */
  
@@@ -871,7 -870,7 +871,7 @@@ static int exec_notification (const not
  
      pthread_attr_init (&attr);
      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
 -    pthread_create (&t, &attr, exec_notification_one, (void *) pln);
 +    plugin_thread_create (&t, &attr, exec_notification_one, (void *) pln);
      pthread_attr_destroy (&attr);
    } /* for (pl) */
  
diff --combined src/plugin.c
@@@ -45,7 -45,6 +45,7 @@@ struct callback_func_
  {
        void *cf_callback;
        user_data_t cf_udata;
 +      plugin_ctx_t cf_ctx;
  };
  typedef struct callback_func_s callback_func_t;
  
@@@ -58,7 -57,6 +58,7 @@@ struct read_func_
         * The `rf_super' member MUST be the first one in this structure! */
  #define rf_callback rf_super.cf_callback
  #define rf_udata rf_super.cf_udata
 +#define rf_ctx rf_super.cf_ctx
        callback_func_t rf_super;
        char rf_group[DATA_MAX_NAME_LEN];
        char rf_name[DATA_MAX_NAME_LEN];
@@@ -95,9 -93,6 +95,9 @@@ static pthread_cond_t  read_cond = PTHR
  static pthread_t      *read_threads = NULL;
  static int             read_threads_num = 0;
  
 +static pthread_key_t   plugin_ctx_key;
 +static _Bool           plugin_ctx_key_initialized = 0;
 +
  /*
   * Static functions
   */
@@@ -251,8 -246,6 +251,8 @@@ static int create_register_callback (ll
                cf->cf_udata = *ud;
        }
  
 +      cf->cf_ctx = plugin_get_ctx ();
 +
        return (register_callback (list, name, cf));
  } /* }}} int create_register_callback */
  
@@@ -298,7 -291,7 +298,7 @@@ static int plugin_load_file (char *file
                dlh = lt_dlopenadvise(file, advise);
                lt_dladvise_destroy(&advise);
        } else {
 -              dlh = lt_dlopen (file);
 +              dlh = lt_dlopen (file);
        }
  #else /* if LIBTOOL_VERSION == 1 */
        if (flags & PLUGIN_FLAGS_GLOBAL)
@@@ -353,34 -346,33 +353,34 @@@ static void *plugin_read_thread (void _
        while (read_loop != 0)
        {
                read_func_t *rf;
 +              plugin_ctx_t old_ctx;
                cdtime_t now;
                int status;
                int rf_type;
                int rc;
  
 -              /* Get the read function that needs to be read next. */
 +              /* Get the read function that needs to be read next.
 +               * We don't need to hold "read_lock" for the heap, but we need
 +               * to call c_heap_get_root() and pthread_cond_wait() in the
 +               * same protected block. */
 +              pthread_mutex_lock (&read_lock);
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                {
 -                      struct timespec abstime;
 -
 -                      now = cdtime ();
 -
 -                      CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime);
 -
 -                      pthread_mutex_lock (&read_lock);
 -                      pthread_cond_timedwait (&read_cond, &read_lock,
 -                                      &abstime);
 -                      pthread_mutex_unlock (&read_lock);
 +                      pthread_cond_wait (&read_cond, &read_lock);
 +                        pthread_mutex_unlock (&read_lock);
                        continue;
                }
 +              pthread_mutex_unlock (&read_lock);
  
                if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
                {
 +                      /* this should not happen, because the interval is set
 +                       * for each plugin when loading it
 +                       * XXX: issue a warning? */
                        now = cdtime ();
  
 -                      CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval);
 +                      CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval);
  
                        rf->rf_effective_interval = rf->rf_interval;
  
  
                DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
  
 +              old_ctx = plugin_set_ctx (rf->rf_ctx);
 +
                if (rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
                        status = (*callback) (&rf->rf_udata);
                }
  
 +              plugin_set_ctx (old_ctx);
 +
                /* If the function signals failure, we will increase the
                 * intervals in which it will be called. */
                if (status != 0)
@@@ -726,6 -714,9 +726,9 @@@ static int plugin_insert_read (read_fun
        int status;
        llentry_t *le;
  
+       cdtime_t now = cdtime ();
+       CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
        pthread_mutex_lock (&read_lock);
  
        if (read_list == NULL)
        /* This does not fail. */
        llist_append (read_list, le);
  
 +      /* Wake up all the read threads. */
 +      pthread_cond_broadcast (&read_cond);
        pthread_mutex_unlock (&read_lock);
        return (0);
  } /* int plugin_insert_read */
  
 +static int read_cb_wrapper (user_data_t *ud)
 +{
 +      int (*callback) (void);
 +
 +      if (ud == NULL)
 +              return -1;
 +
 +      callback = ud->data;
 +      return callback();
 +} /* int read_cb_wrapper */
 +
  int plugin_register_read (const char *name,
                int (*callback) (void))
  {
        read_func_t *rf;
 +      plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
  
 +      if (ctx.interval != 0) {
 +              /* If ctx.interval is not zero (== use the plugin or global
 +               * interval), we need to use the "complex" read callback,
 +               * because only that allows to specify a different interval.
 +               * Wrap the callback using read_cb_wrapper(). */
 +              struct timespec interval;
 +              user_data_t user_data;
 +
 +              user_data.data = callback;
 +              user_data.free_func = NULL;
 +
 +              CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
 +              return plugin_register_complex_read (/* group = */ NULL,
 +                              name, read_cb_wrapper, &interval, &user_data);
 +      }
 +
 +      DEBUG ("plugin_register_read: default_interval = %.3f",
 +                      CDTIME_T_TO_DOUBLE(plugin_get_interval ()));
 +
        rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
 +      rf->rf_ctx = ctx;
        rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_SIMPLE;
@@@ -856,7 -813,6 +859,7 @@@ int plugin_register_complex_read (cons
                user_data_t *user_data)
  {
        read_func_t *rf;
 +      plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
  
        rf = malloc (sizeof (*rf));
        {
                rf->rf_interval = *interval;
        }
 +      else if (ctx.interval != 0)
 +      {
 +              CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
 +      }
        rf->rf_effective_interval = rf->rf_interval;
  
 +      DEBUG ("plugin_register_read: interval = %i.%09i",
 +                      (int) rf->rf_interval.tv_sec,
 +                      (int) rf->rf_interval.tv_nsec);
 +
        /* Set user data */
        if (user_data == NULL)
        {
                rf->rf_udata = *user_data;
        }
  
 +      rf->rf_ctx = ctx;
 +
        status = plugin_insert_read (rf);
        if (status != 0)
                sfree (rf);
@@@ -1178,13 -1124,10 +1181,13 @@@ void plugin_init_all (void
        {
                callback_func_t *cf;
                plugin_init_cb callback;
 +              plugin_ctx_t old_ctx;
  
                cf = le->value;
 +              old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
                status = (*callback) ();
 +              plugin_set_ctx (old_ctx);
  
                if (status != 0)
                {
@@@ -1237,14 -1180,11 +1240,14 @@@ int plugin_read_all_once (void
        while (42)
        {
                read_func_t *rf;
 +              plugin_ctx_t old_ctx;
  
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                        break;
  
 +              old_ctx = plugin_set_ctx (rf->rf_ctx);
 +
                if (rf->rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
                        status = (*callback) (&rf->rf_udata);
                }
  
 +              plugin_set_ctx (old_ctx);
 +
                if (status != 0)
                {
                        NOTICE ("read-function of plugin `%s' failed.",
@@@ -1308,9 -1246,6 +1311,9 @@@ int plugin_write (const char *plugin, /
        callback_func_t *cf = le->value;
        plugin_write_cb callback;
  
 +      /* do not switch plugin context; rather keep the context (interval)
 +       * information of the calling read plugin */
 +
        DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
        callback = cf->cf_callback;
        status = (*callback) (ds, vl, &cf->cf_udata);
  
      cf = le->value;
  
 +    /* do not switch plugin context; rather keep the context (interval)
 +     * information of the calling read plugin */
 +
      DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
      callback = cf->cf_callback;
      status = (*callback) (ds, vl, &cf->cf_udata);
@@@ -1369,7 -1301,6 +1372,7 @@@ int plugin_flush (const char *plugin, c
    {
      callback_func_t *cf;
      plugin_flush_cb callback;
 +    plugin_ctx_t old_ctx;
  
      if ((plugin != NULL)
          && (strcmp (plugin, le->key) != 0))
      }
  
      cf = le->value;
 +    old_ctx = plugin_set_ctx (cf->cf_ctx);
      callback = cf->cf_callback;
  
      (*callback) (timeout, identifier, &cf->cf_udata);
  
 +    plugin_set_ctx (old_ctx);
 +
      le = le->next;
    }
    return (0);
@@@ -1418,10 -1346,8 +1421,10 @@@ void plugin_shutdown_all (void
        {
                callback_func_t *cf;
                plugin_shutdown_cb callback;
 +              plugin_ctx_t old_ctx;
  
                cf = le->value;
 +              old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
  
                /* Advance the pointer before calling the callback allows
                le = le->next;
  
                (*callback) ();
 +
 +              plugin_set_ctx (old_ctx);
        }
  
        /* Write plugins which use the `user_data' pointer usually need the
@@@ -1461,15 -1385,12 +1464,15 @@@ int plugin_dispatch_missing (const valu
    {
      callback_func_t *cf;
      plugin_missing_cb callback;
 +    plugin_ctx_t old_ctx;
      int status;
  
      cf = le->value;
 +    old_ctx = plugin_set_ctx (cf->cf_ctx);
      callback = cf->cf_callback;
  
      status = (*callback) (vl, &cf->cf_udata);
 +    plugin_set_ctx (old_ctx);
      if (status != 0)
      {
        if (status < 0)
@@@ -1545,25 -1466,7 +1548,25 @@@ int plugin_dispatch_values (value_list_
                vl->time = cdtime ();
  
        if (vl->interval <= 0)
 -              vl->interval = interval_g;
 +      {
 +              plugin_ctx_t ctx = plugin_get_ctx ();
 +
 +              if (ctx.interval != 0)
 +                      vl->interval = ctx.interval;
 +              else
 +              {
 +                      char name[6 * DATA_MAX_NAME_LEN];
 +                      FORMAT_VL (name, sizeof (name), vl);
 +                      ERROR ("plugin_dispatch_values: Unable to determine "
 +                                      "interval from context for "
 +                                      "value list \"%s\". "
 +                                      "This indicates a broken plugin. "
 +                                      "Please report this problem to the "
 +                                      "collectd mailing list or at "
 +                                      "<http://collectd.org/bugs/>.", name);
 +                      vl->interval = cf_get_default_interval ();
 +              }
 +      }
  
        DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
                        "host = %s; "
@@@ -1753,9 -1656,6 +1756,9 @@@ int plugin_dispatch_notification (cons
                plugin_notification_cb callback;
                int status;
  
 +              /* do not switch plugin context; rather keep the context
 +               * (interval) information of the calling plugin */
 +
                cf = le->value;
                callback = cf->cf_callback;
                status = (*callback) (notif, &cf->cf_udata);
@@@ -1803,9 -1703,6 +1806,9 @@@ void plugin_log (int level, const char 
                cf = le->value;
                callback = cf->cf_callback;
  
 +              /* do not switch plugin context; rather keep the context
 +               * (interval) information of the calling plugin */
 +
                (*callback) (level, msg, &cf->cf_udata);
  
                le = le->next;
@@@ -2038,122 -1935,4 +2041,122 @@@ int plugin_notification_meta_free (noti
    return (0);
  } /* int plugin_notification_meta_free */
  
 +static void plugin_ctx_destructor (void *ctx)
 +{
 +      sfree (ctx);
 +} /* void plugin_ctx_destructor */
 +
 +static plugin_ctx_t ctx_init = { /* interval = */ 0 };
 +
 +static plugin_ctx_t *plugin_ctx_create (void)
 +{
 +      plugin_ctx_t *ctx;
 +
 +      ctx = malloc (sizeof (*ctx));
 +      if (ctx == NULL) {
 +              char errbuf[1024];
 +              ERROR ("Failed to allocate plugin context: %s",
 +                              sstrerror (errno, errbuf, sizeof (errbuf)));
 +              return NULL;
 +      }
 +
 +      *ctx = ctx_init;
 +      assert (plugin_ctx_key_initialized);
 +      pthread_setspecific (plugin_ctx_key, ctx);
 +      DEBUG("Created new plugin context.");
 +      return (ctx);
 +} /* int plugin_ctx_create */
 +
 +void plugin_init_ctx (void)
 +{
 +      pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
 +      plugin_ctx_key_initialized = 1;
 +} /* void plugin_init_ctx */
 +
 +plugin_ctx_t plugin_get_ctx (void)
 +{
 +      plugin_ctx_t *ctx;
 +
 +      assert (plugin_ctx_key_initialized);
 +      ctx = pthread_getspecific (plugin_ctx_key);
 +
 +      if (ctx == NULL) {
 +              ctx = plugin_ctx_create ();
 +              /* this must no happen -- exit() instead? */
 +              if (ctx == NULL)
 +                      return ctx_init;
 +      }
 +
 +      return (*ctx);
 +} /* plugin_ctx_t plugin_get_ctx */
 +
 +plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
 +{
 +      plugin_ctx_t *c;
 +      plugin_ctx_t old;
 +
 +      assert (plugin_ctx_key_initialized);
 +      c = pthread_getspecific (plugin_ctx_key);
 +
 +      if (c == NULL) {
 +              c = plugin_ctx_create ();
 +              /* this must no happen -- exit() instead? */
 +              if (c == NULL)
 +                      return ctx_init;
 +      }
 +
 +      old = *c;
 +      *c = ctx;
 +
 +      return (old);
 +} /* void plugin_set_ctx */
 +
 +cdtime_t plugin_get_interval (void)
 +{
 +      cdtime_t interval;
 +
 +      interval = plugin_get_ctx().interval;
 +      if (interval > 0)
 +              return interval;
 +
 +      return cf_get_default_interval ();
 +} /* cdtime_t plugin_get_interval */
 +
 +typedef struct {
 +      plugin_ctx_t ctx;
 +      void *(*start_routine) (void *);
 +      void *arg;
 +} plugin_thread_t;
 +
 +static void *plugin_thread_start (void *arg)
 +{
 +      plugin_thread_t *plugin_thread = arg;
 +
 +      void *(*start_routine) (void *) = plugin_thread->start_routine;
 +      void *plugin_arg = plugin_thread->arg;
 +
 +      plugin_set_ctx (plugin_thread->ctx);
 +
 +      free (plugin_thread);
 +
 +      return start_routine (plugin_arg);
 +} /* void *plugin_thread_start */
 +
 +int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
 +              void *(*start_routine) (void *), void *arg)
 +{
 +      plugin_thread_t *plugin_thread;
 +
 +      plugin_thread = malloc (sizeof (*plugin_thread));
 +      if (plugin_thread == NULL)
 +              return -1;
 +
 +      plugin_thread->ctx           = plugin_get_ctx ();
 +      plugin_thread->start_routine = start_routine;
 +      plugin_thread->arg           = arg;
 +
 +      return pthread_create (thread, attr,
 +                      plugin_thread_start, plugin_thread);
 +} /* int plugin_thread_create */
 +
  /* vim: set sw=8 ts=8 noet fdm=marker : */