rrdcached: examine the current queue with the "QUEUE" command
[rrdtool.git] / src / rrd_daemon.c
index 5231942..d56cf06 100644 (file)
@@ -429,7 +429,7 @@ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
 
   assert(sock != NULL);
 
-  new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
+  new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
   if (new_buf == NULL)
   {
     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
@@ -703,7 +703,7 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
   {
     char **temp;
 
-    temp = (char **) realloc (cfd->keys,
+    temp = (char **) rrd_realloc (cfd->keys,
         sizeof (char *) * (cfd->keys_num + 1));
     if (temp == NULL)
     {
@@ -1076,9 +1076,11 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */
     "FLUSHALL\n"
     "PENDING <filename>\n"
     "FORGET <filename>\n"
+    "QUEUE\n"
     "UPDATE <filename> <values> [<values> ...]\n"
     "BATCH\n"
     "STATS\n"
+    "QUIT\n"
   };
 
   char *help_flush[2] =
@@ -1120,6 +1122,18 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */
     "Any pending updates for the file will be lost.\n"
   };
 
+  char *help_queue[2] =
+  {
+    "Help for QUEUE\n"
+    ,
+    "Shows all files in the output queue.\n"
+    "The output is zero or more lines in the following format:\n"
+    "(where <num_vals> is the number of values to be written)\n"
+    "\n"
+    "<num_vals> <filename>\n"
+    "\n"
+  };
+
   char *help_update[2] =
   {
     "Help for UPDATE\n"
@@ -1167,6 +1181,13 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */
     "For more information, consult the rrdcached(1) documentation.\n"
   };
 
+  char *help_quit[2] =
+  {
+    "Help for QUIT\n"
+    ,
+    "Disconnect from rrdcached.\n"
+  };
+
   status = buffer_get_field (&buffer, &buffer_size, &command);
   if (status != 0)
     help_text = help_help;
@@ -1182,10 +1203,14 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */
       help_text = help_pending;
     else if (strcasecmp (command, "forget") == 0)
       help_text = help_forget;
+    else if (strcasecmp (command, "queue") == 0)
+      help_text = help_queue;
     else if (strcasecmp (command, "stats") == 0)
       help_text = help_stats;
     else if (strcasecmp (command, "batch") == 0)
       help_text = help_batch;
+    else if (strcasecmp (command, "quit") == 0)
+      help_text = help_quit;
     else
       help_text = help_help;
   }
@@ -1373,6 +1398,24 @@ static int handle_request_forget(listen_socket_t *sock, /* {{{ */
   assert(1==0);
 } /* }}} static int handle_request_forget */
 
+static int handle_request_queue (listen_socket_t *sock) /* {{{ */
+{
+  cache_item_t *ci;
+
+  pthread_mutex_lock(&cache_lock);
+
+  ci = cache_queue_head;
+  while (ci != NULL)
+  {
+    add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
+    ci = ci->next;
+  }
+
+  pthread_mutex_unlock(&cache_lock);
+
+  return send_response(sock, RESP_OK, "in queue.\n");
+} /* }}} int handle_request_queue */
+
 static int handle_request_update (listen_socket_t *sock, /* {{{ */
                                   time_t now,
                                   char *buffer, size_t buffer_size)
@@ -1497,7 +1540,7 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */
     else
       ci->last_update_stamp = stamp;
 
-    temp = (char **) realloc (ci->values,
+    temp = (char **) rrd_realloc (ci->values,
         sizeof (char *) * (ci->values_num + 1));
     if (temp == NULL)
     {
@@ -1632,6 +1675,8 @@ static int handle_request (listen_socket_t *sock, /* {{{ */
     return (handle_request_pending(sock, buffer_ptr, buffer_size));
   else if (strcasecmp (command, "forget") == 0)
     return (handle_request_forget(sock, buffer_ptr, buffer_size));
+  else if (strcasecmp (command, "queue") == 0)
+    return (handle_request_queue(sock));
   else if (strcasecmp (command, "stats") == 0)
     return (handle_request_stats (sock));
   else if (strcasecmp (command, "help") == 0)
@@ -1914,7 +1959,7 @@ static void *connection_thread_main (void *args) /* {{{ */
   {
     pthread_t *temp;
 
-    temp = (pthread_t *) realloc (connection_threads,
+    temp = (pthread_t *) rrd_realloc (connection_threads,
         sizeof (pthread_t) * (connection_threads_num + 1));
     if (temp == NULL)
     {
@@ -2017,7 +2062,7 @@ out_close:
 
     connection_threads_num--;
 
-    temp = realloc(connection_threads,
+    temp = rrd_realloc(connection_threads,
                    sizeof(*connection_threads) * connection_threads_num);
     if (connection_threads_num > 0 && temp == NULL)
       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
@@ -2041,7 +2086,7 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
   if (strncmp(path, "unix:", strlen("unix:")) == 0)
     path += strlen("unix:");
 
-  temp = (listen_socket_t *) realloc (listen_fds,
+  temp = (listen_socket_t *) rrd_realloc (listen_fds,
       sizeof (listen_fds[0]) * (listen_fds_num + 1));
   if (temp == NULL)
   {
@@ -2170,7 +2215,7 @@ static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
     listen_socket_t *temp;
     int one = 1;
 
-    temp = (listen_socket_t *) realloc (listen_fds,
+    temp = (listen_socket_t *) rrd_realloc (listen_fds,
         sizeof (listen_fds[0]) * (listen_fds_num + 1));
     if (temp == NULL)
     {
@@ -2516,7 +2561,7 @@ static int read_options (int argc, char **argv) /* {{{ */
         }
         memset(new, 0, sizeof(listen_socket_t));
 
-        temp = (listen_socket_t **) realloc (config_listen_address_list,
+        temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
         if (temp == NULL)
         {