X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=17dca62bd53ad8889c5e2fab579f11f29a74ae65;hb=4d5b22c20f6ae93aa53f37b12757a6a64cd63b05;hp=7a51f5d0545e1a7b5f428c474aea152c596ded10;hpb=2154be5fab4197fdfa551b5a46cc3b12c4571d2a;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 7a51f5d..17dca62 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -166,8 +166,8 @@ static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; -static uint64_t stats_updates_total = 0; -static uint64_t stats_values_total = 0; +static uint64_t stats_updates_written = 0; +static uint64_t stats_data_sets_written = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* @@ -223,6 +223,84 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ +static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (fd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + return (0); + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + assert (buffer_used > 0); + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + + /* Fix network line endings. */ + if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + { + buffer_used--; + buffer[buffer_used - 1] = 0; + } + + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (fd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -232,9 +310,6 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ { int did_insert = 0; - RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", - ci->file); - if (ci == NULL) return (-1); @@ -471,9 +546,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", - file, values_num, (void *) values); - status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { @@ -487,8 +559,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ free (values[i]); pthread_mutex_lock (&stats_lock); - stats_updates_total++; - stats_values_total += values_num; + stats_updates_written++; + stats_data_sets_written += values_num; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); @@ -496,8 +568,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* while (do_shutdown == 0) */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); - return (NULL); } /* }}} void *queue_thread_main */ @@ -687,11 +757,11 @@ static int handle_request_help (int fd, /* {{{ */ for (i = 0; i < help_text_len; i++) { - status = write (fd, help_text[i], strlen (help_text[i])); + status = swrite (fd, help_text[i], strlen (help_text[i])); if (status < 0) { status = errno; - RRDD_LOG (LOG_ERR, "handle_request_help: write(2) returned an error."); + RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error."); return (status); } } @@ -707,30 +777,30 @@ static int handle_request_stats (int fd, /* {{{ */ char outbuf[4096]; uint64_t copy_queue_length; - uint64_t copy_updates_total; - uint64_t copy_values_total; + uint64_t copy_updates_written; + uint64_t copy_data_sets_written; - uint64_t tree_nodes; + uint64_t tree_nodes_number; uint64_t tree_depth; pthread_mutex_lock (&stats_lock); - copy_queue_length = stats_queue_length; - copy_updates_total = stats_updates_total; - copy_values_total = stats_values_total; + copy_queue_length = stats_queue_length; + copy_updates_written = stats_updates_written; + copy_data_sets_written = stats_data_sets_written; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); - tree_nodes = (uint64_t) g_tree_nnodes (cache_tree); - tree_depth = (uint64_t) g_tree_height (cache_tree); + tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree); + tree_depth = (uint64_t) g_tree_height (cache_tree); pthread_mutex_unlock (&cache_lock); #define RRDD_STATS_SEND \ outbuf[sizeof (outbuf) - 1] = 0; \ - status = write (fd, outbuf, strlen (outbuf)); \ + status = swrite (fd, outbuf, strlen (outbuf)); \ if (status < 0) \ { \ status = errno; \ - RRDD_LOG (LOG_INFO, "handle_request_stats: write(2) returned an error."); \ + RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \ return (status); \ } @@ -742,15 +812,15 @@ static int handle_request_stats (int fd, /* {{{ */ RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), - "UpdatesWritten: %"PRIu64"\n", copy_updates_total); + "UpdatesWritten: %"PRIu64"\n", copy_updates_written); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), - "ValuesWritten: %"PRIu64"\n", copy_values_total); + "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), - "TreeNodesNumber: %"PRIu64"\n", tree_nodes); + "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), @@ -758,6 +828,7 @@ static int handle_request_stats (int fd, /* {{{ */ RRDD_STATS_SEND; return (0); +#undef RRDD_STATS_SEND } /* }}} int handle_request_stats */ static int handle_request_flush (int fd, /* {{{ */ @@ -786,11 +857,11 @@ static int handle_request_flush (int fd, /* {{{ */ } result[sizeof (result) - 1] = 0; - status = write (fd, result, strlen (result)); + status = swrite (fd, result, strlen (result)); if (status < 0) { status = errno; - RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error."); + RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error."); return (status); } @@ -809,13 +880,25 @@ static int handle_request_update (int fd, /* {{{ */ cache_item_t *ci; char answer[4096]; +#define RRDD_UPDATE_SEND \ + answer[sizeof (answer) - 1] = 0; \ + status = swrite (fd, answer, strlen (answer)); \ + if (status < 0) \ + { \ + status = errno; \ + RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \ + return (status); \ + } + now = time (NULL); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) { - RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name."); - return (-1); + strncpy (answer, "-1 Usage: UPDATE [ ...]\n", + sizeof (answer)); + RRDD_UPDATE_SEND; + return (0); } pthread_mutex_lock (&cache_lock); @@ -828,7 +911,10 @@ static int handle_request_update (int fd, /* {{{ */ { pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); - return (-1); + + strncpy (answer, "-1 malloc failed.\n", sizeof (answer)); + RRDD_UPDATE_SEND; + return (0); } memset (ci, 0, sizeof (cache_item_t)); @@ -836,9 +922,12 @@ static int handle_request_update (int fd, /* {{{ */ if (ci->file == NULL) { pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); free (ci); - return (-1); + RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + + strncpy (answer, "-1 strdup failed.\n", sizeof (answer)); + RRDD_UPDATE_SEND; + return (0); } ci->values = NULL; @@ -847,9 +936,6 @@ static int handle_request_update (int fd, /* {{{ */ ci->flags = CI_FLAGS_IN_TREE; g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); - - RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", - ci->file); } /* }}} */ assert (ci != NULL); @@ -895,18 +981,18 @@ static int handle_request_update (int fd, /* {{{ */ pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); - answer[sizeof (answer) - 1] = 0; - - status = write (fd, answer, strlen (answer)); - if (status < 0) + if (values_num < 1) { - status = errno; - RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error."); - return (status); + strncpy (answer, "-1 No values updated.\n", sizeof (answer)); } - + else + { + snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num, + (values_num == 1) ? "" : "s"); + } + RRDD_UPDATE_SEND; return (0); +#undef RRDD_UPDATE_SEND } /* }}} int handle_request_update */ static int handle_request (int fd) /* {{{ */ @@ -917,31 +1003,19 @@ static int handle_request (int fd) /* {{{ */ char *command; int status; - status = read (fd, buffer, sizeof (buffer)); + status = (int) sread (fd, buffer, sizeof (buffer)); if (status == 0) { return (1); } else if (status < 0) { - RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); - return (-1); - } - buffer_size = status; - assert (((size_t) buffer_size) <= sizeof (buffer)); - - if (buffer[buffer_size - 1] != '\n') - { - RRDD_LOG (LOG_INFO, "handle_request: malformed request."); + RRDD_LOG (LOG_ERR, "handle_request: sread failed."); return (-1); } - - /* Accept Windows style line endings, too */ - if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r')) - { - buffer_size--; - buffer[buffer_size - 1] = '\n'; - } + buffer_size = (size_t) status; + assert (buffer_size <= sizeof (buffer)); + assert (buffer[buffer_size - 1] == 0); /* Place the normal field separator at the end to simplify * `buffer_get_field's work. */ @@ -979,10 +1053,10 @@ static int handle_request (int fd) /* {{{ */ snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command); result[sizeof (result) - 1] = 0; - status = write (fd, result, strlen (result)); + status = swrite (fd, result, strlen (result)); if (status < 0) { - RRDD_LOG (LOG_ERR, "handle_request: write(2) failed."); + RRDD_LOG (LOG_ERR, "handle_request: swrite failed."); return (-1); } } @@ -1244,11 +1318,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ int i; for (i = 0; i < config_listen_address_list_len; i++) - { - RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] " - "= %s", i, config_listen_address_list[i]); open_listen_socket (config_listen_address_list[i]); - } if (config_listen_address_list_len < 1) open_listen_socket (RRDCACHED_DEFAULT_ADDRESS); @@ -1296,6 +1366,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ struct sockaddr_storage client_sa; socklen_t client_sa_size; pthread_t tid; + pthread_attr_t attr; if (pollfds[i].revents == 0) continue; @@ -1324,7 +1395,10 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } - status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + + status = pthread_create (&tid, &attr, connection_thread_main, /* args = */ (void *) client_sd); if (status != 0) { @@ -1351,8 +1425,6 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } pthread_mutex_unlock (&connetion_threads_lock); - RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting."); - return (NULL); } /* }}} void *listen_thread_main */ @@ -1409,7 +1481,7 @@ static int daemonize (void) /* {{{ */ memset (&sa_term, 0, sizeof (sa_term)); sa_term.sa_handler = sig_term_handler; - sigaction (SIGINT, &sa_term, NULL); + sigaction (SIGTERM, &sa_term, NULL); memset (&sa_pipe, 0, sizeof (sa_pipe)); sa_pipe.sa_handler = SIG_IGN; @@ -1440,14 +1512,10 @@ static int daemonize (void) /* {{{ */ static int cleanup (void) /* {{{ */ { - RRDD_LOG (LOG_DEBUG, "cleanup ()"); - do_shutdown++; - RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread.."); pthread_cond_signal (&cache_cond); pthread_join (queue_thread, /* return = */ NULL); - RRDD_LOG (LOG_DEBUG, "cleanup: done"); remove_pidfile ();