write_http plugin: Create one cURL object for each read-thread. ff/json
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 30 Aug 2009 15:22:28 +0000 (17:22 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 30 Aug 2009 15:22:28 +0000 (17:22 +0200)
src/utils_format_json.c
src/write_http.c

index a919316..fcd1788 100644 (file)
@@ -116,8 +116,6 @@ static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */
 
 #undef BUFFER_ADD
 
-  DEBUG ("format_json: values_to_json: buffer = %s;", buffer);
-
   return (0);
 } /* }}} int values_to_json */
 
@@ -171,8 +169,6 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
 #undef BUFFER_ADD_KEYVAL
 #undef BUFFER_ADD
 
-  DEBUG ("format_json: value_list_to_json: buffer = %s;", buffer);
-
   return (0);
 } /* }}} int value_list_to_json */
 
index f14636b..a11448e 100644 (file)
@@ -54,9 +54,6 @@ struct wh_callback_s
 #define WH_FORMAT_JSON    1
         int format;
 
-        CURL *curl;
-        char curl_errbuf[CURL_ERROR_SIZE];
-
         char   send_buffer[4096];
         size_t send_buffer_free;
         size_t send_buffer_fill;
@@ -66,52 +63,68 @@ struct wh_callback_s
 };
 typedef struct wh_callback_s wh_callback_t;
 
-static void wh_reset_buffer (wh_callback_t *cb)  /* {{{ */
+struct wh_curl_s
 {
-        memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
-        cb->send_buffer_free = sizeof (cb->send_buffer);
-        cb->send_buffer_fill = 0;
-        cb->send_buffer_init_time = time (NULL);
+        CURL *curl;
+        char errbuf[CURL_ERROR_SIZE];
+};
+typedef struct wh_curl_s wh_curl_t;
 
-        if (cb->format == WH_FORMAT_JSON)
-        {
-                format_json_initialize (cb->send_buffer,
-                                &cb->send_buffer_fill,
-                                &cb->send_buffer_free);
-        }
-} /* }}} wh_reset_buffer */
+static pthread_once_t curl_key_init = PTHREAD_ONCE_INIT;
+static pthread_key_t curl_key;
 
-static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
+static void wh_curl_destroy (void *data) /* {{{ */
 {
-        int status = 0;
+        wh_curl_t *c = data;
 
-        curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
-        status = curl_easy_perform (cb->curl);
-        if (status != 0)
-        {
-                ERROR ("write_http plugin: curl_easy_perform failed with "
-                                "status %i: %s",
-                                status, cb->curl_errbuf);
-        }
-        return (status);
-} /* }}} wh_send_buffer */
+        if (c == NULL)
+                return;
+
+        DEBUG ("write_http plugin: Destroying a cURL object.");
+
+        curl_easy_cleanup (c->curl);
+        sfree (c);
+} /* }}} void wh_curl_destroy */
+
+static void wh_curl_init (void) /* {{{ */
+{
+        pthread_key_create(&curl_key, wh_curl_destroy);
+} /* }}} void wh_curl_init */
 
-static int wh_callback_init (wh_callback_t *cb) /* {{{ */
+static wh_curl_t *wh_curl_get (wh_callback_t *cb) /* {{{ */
 {
         struct curl_slist *headers;
+        wh_curl_t *c;
+
+        pthread_once (&curl_key_init, wh_curl_init);
 
-        if (cb->curl != NULL)
-                return (0);
+        c = pthread_getspecific (curl_key);
+        if (c != NULL)
+                return (c);
 
-        cb->curl = curl_easy_init ();
-        if (cb->curl == NULL)
+        DEBUG ("write_http plugin: Creating a cURL object.");
+
+        c = malloc (sizeof (*c));
+        if (c == NULL)
         {
-                ERROR ("curl plugin: curl_easy_init failed.");
-                return (-1);
+                ERROR ("write_http plugin: malloc failed.");
+                return (NULL);
+        }
+        memset (c, 0, sizeof (*c));
+
+        c->curl = curl_easy_init ();
+        if (c->curl == NULL)
+        {
+                ERROR ("write_http plugin: curl_easy_init failed.");
+                sfree (c);
+                return (NULL);
         }
 
-        curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION);
+        curl_easy_setopt (c->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION);
 
+        /* The fields from `cb' we read here are only written to at
+         * configuration time, therefore it's safe to read them without a
+         * lock. */
         headers = NULL;
         headers = curl_slist_append (headers, "Accept:  */*");
         if (cb->format == WH_FORMAT_JSON)
@@ -119,45 +132,66 @@ static int wh_callback_init (wh_callback_t *cb) /* {{{ */
         else
                 headers = curl_slist_append (headers, "Content-Type: text/plain");
         headers = curl_slist_append (headers, "Expect:");
-        curl_easy_setopt (cb->curl, CURLOPT_HTTPHEADER, headers);
+        curl_easy_setopt (c->curl, CURLOPT_HTTPHEADER, headers);
 
-        curl_easy_setopt (cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
-        curl_easy_setopt (cb->curl, CURLOPT_URL, cb->location);
+        curl_easy_setopt (c->curl, CURLOPT_ERRORBUFFER, c->errbuf);
+        curl_easy_setopt (c->curl, CURLOPT_URL, cb->location);
 
-        if (cb->user != NULL)
-        {
-                size_t credentials_size;
+        if (cb->credentials != NULL)
+                curl_easy_setopt (c->curl, CURLOPT_USERPWD, cb->credentials);
 
-                credentials_size = strlen (cb->user) + 2;
-                if (cb->pass != NULL)
-                        credentials_size += strlen (cb->pass);
+        curl_easy_setopt (c->curl, CURLOPT_SSL_VERIFYPEER, cb->verify_peer);
+        curl_easy_setopt (c->curl, CURLOPT_SSL_VERIFYHOST,
+                        cb->verify_host ? 2 : 0);
+        if (cb->cacert != NULL)
+                curl_easy_setopt (c->curl, CURLOPT_CAINFO, cb->cacert);
 
-                cb->credentials = (char *) malloc (credentials_size);
-                if (cb->credentials == NULL)
-                {
-                        ERROR ("curl plugin: malloc failed.");
-                        return (-1);
-                }
+        pthread_setspecific (curl_key, c);
 
-                ssnprintf (cb->credentials, credentials_size, "%s:%s",
-                                cb->user, (cb->pass == NULL) ? "" : cb->pass);
-                curl_easy_setopt (cb->curl, CURLOPT_USERPWD, cb->credentials);
-                curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST);
+        return (c);
+} /* }}} int wh_curl_get */
+
+static void wh_reset_buffer (wh_callback_t *cb)  /* {{{ */
+{
+        memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
+        cb->send_buffer_free = sizeof (cb->send_buffer);
+        cb->send_buffer_fill = 0;
+        cb->send_buffer_init_time = time (NULL);
+
+        if (cb->format == WH_FORMAT_JSON)
+        {
+                format_json_initialize (cb->send_buffer,
+                                &cb->send_buffer_fill,
+                                &cb->send_buffer_free);
         }
+} /* }}} wh_reset_buffer */
 
-        curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYPEER, cb->verify_peer);
-        curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYHOST,
-                        cb->verify_host ? 2 : 0);
-        if (cb->cacert != NULL)
-                curl_easy_setopt (cb->curl, CURLOPT_CAINFO, cb->cacert);
+static int wh_send_buffer (wh_callback_t *cb, /* {{{ */
+                const char *buffer)
+{
+        int status = 0;
+        wh_curl_t *c;
 
-        wh_reset_buffer (cb);
+        c = wh_curl_get (cb);
+        if (c == NULL)
+                return (-1);
 
-        return (0);
-} /* }}} int wh_callback_init */
+        curl_easy_setopt (c->curl, CURLOPT_POSTFIELDS, buffer);
+        status = curl_easy_perform (c->curl);
+        if (status != 0)
+        {
+                ERROR ("write_http plugin: curl_easy_perform failed with "
+                                "status %i: %s",
+                                status, c->errbuf);
+        }
+
+        return (status);
+} /* }}} wh_send_buffer */
 
+/* You must hold cb->send_lock when entering `wh_flush_nolock'. */
 static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
 {
+        char buffer[sizeof (cb->send_buffer)];
         int status;
 
         DEBUG ("write_http plugin: wh_flush_nolock: timeout = %i; "
@@ -173,6 +207,7 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
                         return (0);
         }
 
+        /* Finalize the send buffer and copy it to `buffer'. */
         if (cb->format == WH_FORMAT_COMMAND)
         {
                 if (cb->send_buffer_fill <= 0)
@@ -181,7 +216,7 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
                         return (0);
                 }
 
-                status = wh_send_buffer (cb);
+                memcpy (buffer, cb->send_buffer, sizeof (buffer));
                 wh_reset_buffer (cb);
         }
         else if (cb->format == WH_FORMAT_JSON)
@@ -203,7 +238,7 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
                         return (status);
                 }
 
-                status = wh_send_buffer (cb);
+                memcpy (buffer, cb->send_buffer, sizeof (buffer));
                 wh_reset_buffer (cb);
         }
         else
@@ -214,6 +249,14 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
                 return (-1);
         }
 
+        /* We copied the send buffer to `buffer' and reset it so we can do
+         * without the `send_lock' here. This allows other read-threads to
+         * append stuff to the new buffer while we wait for the web-server to
+         * reply. */
+        pthread_mutex_unlock (&cb->send_lock);
+        status = wh_send_buffer (cb, buffer);
+        pthread_mutex_lock (&cb->send_lock);
+
         return (status);
 } /* }}} wh_flush_nolock */
 
@@ -230,18 +273,6 @@ static int wh_flush (int timeout, /* {{{ */
         cb = user_data->data;
 
         pthread_mutex_lock (&cb->send_lock);
-
-        if (cb->curl == NULL)
-        {
-                status = wh_callback_init (cb);
-                if (status != 0)
-                {
-                        ERROR ("write_http plugin: wh_callback_init failed.");
-                        pthread_mutex_unlock (&cb->send_lock);
-                        return (-1);
-                }
-        }
-
         status = wh_flush_nolock (timeout, cb);
         pthread_mutex_unlock (&cb->send_lock);
 
@@ -259,7 +290,6 @@ static void wh_callback_free (void *data) /* {{{ */
 
         wh_flush_nolock (/* timeout = */ -1, cb);
 
-        curl_easy_cleanup (cb->curl);
         sfree (cb->location);
         sfree (cb->user);
         sfree (cb->pass);
@@ -361,17 +391,6 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{
 
         pthread_mutex_lock (&cb->send_lock);
 
-        if (cb->curl == NULL)
-        {
-                status = wh_callback_init (cb);
-                if (status != 0)
-                {
-                        ERROR ("write_http plugin: wh_callback_init failed.");
-                        pthread_mutex_unlock (&cb->send_lock);
-                        return (-1);
-                }
-        }
-
         if (command_len >= cb->send_buffer_free)
         {
                 status = wh_flush_nolock (/* timeout = */ -1, cb);
@@ -409,17 +428,6 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ *
 
         pthread_mutex_lock (&cb->send_lock);
 
-        if (cb->curl == NULL)
-        {
-                status = wh_callback_init (cb);
-                if (status != 0)
-                {
-                        ERROR ("write_http plugin: wh_callback_init failed.");
-                        pthread_mutex_unlock (&cb->send_lock);
-                        return (-1);
-                }
-        }
-
         status = format_json_value_list (cb->send_buffer,
                         &cb->send_buffer_fill,
                         &cb->send_buffer_free,
@@ -565,7 +573,6 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */
         cb->verify_host = 1;
         cb->cacert = NULL;
         cb->format = WH_FORMAT_COMMAND;
-        cb->curl = NULL;
 
         pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
 
@@ -599,6 +606,27 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */
         DEBUG ("write_http: Registering write callback with URL %s",
                         cb->location);
 
+        if (cb->user != NULL)
+        {
+                size_t credentials_size;
+
+                credentials_size = strlen (cb->user) + 2;
+                if (cb->pass != NULL)
+                        credentials_size += strlen (cb->pass);
+
+                cb->credentials = (char *) malloc (credentials_size);
+                if (cb->credentials == NULL)
+                {
+                        ERROR ("write_http plugin: malloc failed.");
+                        return (-1);
+                }
+
+                ssnprintf (cb->credentials, credentials_size, "%s:%s",
+                                cb->user, (cb->pass == NULL) ? "" : cb->pass);
+        }
+
+        wh_reset_buffer (cb);
+
         memset (&user_data, 0, sizeof (user_data));
         user_data.data = cb;
         user_data.free_func = NULL;