X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=3163e1865629abe2879f0bcaeb5dd583f202c66f;hb=28e399e75eb5a26a5ab1b65b7b18b4cfd69e263d;hp=a5d95bfc5c17cd22c1023cc04e22f2686a2d0366;hpb=1f6892fdd9a34a76a6c028f5f725b5a2d717d966;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index a5d95bf..3163e18 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -91,11 +91,6 @@ # define __attribute__(x) /**/ #endif -#if 0 -/* FIXME: I don't want to declare strdup myself! */ -extern char *strdup (const char *s); -#endif - /* * Types */ @@ -121,6 +116,21 @@ struct cache_item_s cache_item_t *next; }; +struct callback_flush_data_s +{ + time_t now; + char **keys; + size_t keys_num; +}; +typedef struct callback_flush_data_s callback_flush_data_t; + +enum queue_side_e +{ + HEAD, + TAIL +}; +typedef enum queue_side_e queue_side_t; + /* * Variables */ @@ -142,8 +152,12 @@ static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; +static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; + static int config_write_interval = 300; static int config_flush_interval = 3600; +static char *config_pid_file = NULL; +static char *config_base_dir = NULL; static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; @@ -161,28 +175,113 @@ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ do_shutdown++; } /* }}} void sig_term_handler */ +static int write_pidfile (void) /* {{{ */ +{ + pid_t pid; + char *file; + FILE *fh; + + pid = getpid (); + + file = (config_pid_file != NULL) + ? config_pid_file + : LOCALSTATEDIR "/run/rrdcached.pid"; + + fh = fopen (file, "w"); + if (fh == NULL) + { + RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + return (-1); + } + + fprintf (fh, "%i\n", (int) pid); + fclose (fh); + + return (0); +} /* }}} int write_pidfile */ + +static int remove_pidfile (void) /* {{{ */ +{ + char *file; + int status; + + file = (config_pid_file != NULL) + ? config_pid_file + : LOCALSTATEDIR "/run/rrdcached.pid"; + + status = unlink (file); + if (status == 0) + return (0); + return (errno); +} /* }}} int remove_pidfile */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! */ -static int enqueue_cache_item (cache_item_t *ci) /* {{{ */ +static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ + queue_side_t side) { - RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", + RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", ci->file); if (ci == NULL) return (-1); - if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) - return (-1); + if (ci->values_num == 0) + return (0); - assert (ci->next == NULL); + if (side == HEAD) + { + if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + { + assert (ci->next == NULL); + ci->next = cache_queue_head; + cache_queue_head = ci; - if (cache_queue_tail == NULL) - cache_queue_head = ci; - else - cache_queue_tail->next = ci; - cache_queue_tail = ci; + if (cache_queue_tail == NULL) + cache_queue_tail = cache_queue_head; + } + else if (cache_queue_head == ci) + { + /* do nothing */ + } + else /* enqueued, but not first entry */ + { + cache_item_t *prev; + + /* find previous entry */ + for (prev = cache_queue_head; prev != NULL; prev = prev->next) + if (prev->next == ci) + break; + assert (prev != NULL); + + /* move to the front */ + prev->next = ci->next; + ci->next = cache_queue_head; + cache_queue_head = ci; + + /* check if we need to adapt the tail */ + if (cache_queue_tail == ci) + cache_queue_tail = prev; + } + } + else /* (side == TAIL) */ + { + /* We don't move values back in the list.. */ + if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + return (0); + + assert (ci->next == NULL); + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + } + + ci->flags |= CI_FLAGS_IN_QUEUE; return (0); } /* }}} int enqueue_cache_item */ @@ -192,22 +291,42 @@ static int enqueue_cache_item (cache_item_t *ci) /* {{{ */ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held * while this is in progress. */ -static gboolean tree_callback_flush (gpointer key /* {{{ */ - __attribute__((unused)), gpointer value, gpointer data) +static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ + gpointer data) { cache_item_t *ci; - time_t now; - - key = NULL; /* make compiler happy */ + callback_flush_data_t *cfd; ci = (cache_item_t *) value; - now = *((time_t *) data); + cfd = (callback_flush_data_t *) data; - if (((now - ci->last_flush_time) >= config_write_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) - enqueue_cache_item (ci); + if (((cfd->now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + } + else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num <= 0)) + { + char **temp; + + temp = (char **) realloc (cfd->keys, + sizeof (char *) * (cfd->keys_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed."); + return (FALSE); + } + cfd->keys = temp; + /* Make really sure this points to the _same_ place */ + assert ((char *) key == ci->file); + cfd->keys[cfd->keys_num] = (char *) key; + cfd->keys_num++; + } - return (TRUE); + return (FALSE); } /* }}} gboolean tree_callback_flush */ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ @@ -235,13 +354,48 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { - time_t time_now; + callback_flush_data_t cfd; + size_t k; + memset (&cfd, 0, sizeof (cfd)); /* Pass the current time as user data so that we don't need to call * `time' for each node. */ - time_now = time (NULL); + cfd.now = time (NULL); + cfd.keys = NULL; + cfd.keys_num = 0; + + /* `tree_callback_flush' will return the keys of all values that haven't + * been touched in the last `config_flush_interval' seconds in `cfd'. + * The char*'s in this array point to the same memory as ci->file, so we + * don't need to free them separately. */ + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); - g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now); + for (k = 0; k < cfd.keys_num; k++) + { + /* This must not fail. */ + ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); + assert (ci != NULL); + + /* If we end up here with values available, something's seriously + * messed up. */ + assert (ci->values_num == 0); + + /* Remove the node from the tree */ + g_tree_remove (cache_tree, cfd.keys[k]); + cfd.keys[k] = NULL; + + /* Now free and clean up `ci'. */ + free (ci->file); + ci->file = NULL; + free (ci); + ci = NULL; + } /* for (k = 0; k < cfd.keys_num; k++) */ + + if (cfd.keys != NULL) + { + free (cfd.keys); + cfd.keys = NULL; + } /* Determine the time of the next cache flush. */ while (next_flush.tv_sec < now.tv_sec) @@ -307,6 +461,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ free (values[i]); pthread_mutex_lock (&cache_lock); + pthread_cond_broadcast (&flush_cond); } /* while (do_shutdown == 0) */ pthread_mutex_unlock (&cache_lock); @@ -315,12 +470,143 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ return (NULL); } /* }}} void *queue_thread_main */ +static int buffer_get_field (char **buffer_ret, /* {{{ */ + size_t *buffer_size_ret, char **field_ret) +{ + char *buffer; + size_t buffer_pos; + size_t buffer_size; + char *field; + size_t field_size; + int status; + + buffer = *buffer_ret; + buffer_pos = 0; + buffer_size = *buffer_size_ret; + field = *buffer_ret; + field_size = 0; + + /* This is ensured by `handle_request'. */ + assert (buffer[buffer_size - 1] == ' '); + + status = -1; + while (buffer_pos < buffer_size) + { + /* Check for end-of-field or end-of-buffer */ + if (buffer[buffer_pos] == ' ') + { + field[field_size] = 0; + field_size++; + buffer_pos++; + status = 0; + break; + } + /* Handle escaped characters. */ + else if (buffer[buffer_pos] == '\\') + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer_pos++; + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + /* Normal operation */ + else + { + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (status); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + *field_ret = field; + + return (0); +} /* }}} int buffer_get_field */ + +static int flush_file (const char *filename) /* {{{ */ +{ + cache_item_t *ci; + + pthread_mutex_lock (&cache_lock); + + ci = (cache_item_t *) g_tree_lookup (cache_tree, filename); + if (ci == NULL) + { + pthread_mutex_unlock (&cache_lock); + return (ENOENT); + } + + /* Enqueue at head */ + enqueue_cache_item (ci, HEAD); + pthread_cond_signal (&cache_cond); + + while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + { + ci = NULL; + + pthread_cond_wait (&flush_cond, &cache_lock); + + ci = g_tree_lookup (cache_tree, filename); + if (ci == NULL) + { + RRDD_LOG (LOG_ERR, "flush_file: Tree node went away " + "while waiting for flush."); + pthread_mutex_unlock (&cache_lock); + return (-1); + } + } + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* }}} int flush_file */ + +static int handle_request_flush (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + char *file; + int status; + char result[4096]; + + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name."); + return (-1); + } + + status = flush_file (file); + if (status == 0) + snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file); + else if (status == ENOENT) + snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + else if (status < 0) + strncpy (result, "-1 Internal error.\n", sizeof (result)); + else + snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status); + result[sizeof (result) - 1] = 0; + + status = write (fd, result, strlen (result)); + if (status < 0) + { + status = errno; + RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error."); + return (status); + } + + return (0); +} /* }}} int handle_request_flush */ + static int handle_request_update (int fd, /* {{{ */ - char *buffer, int buffer_size) + char *buffer, size_t buffer_size) { char *file; - char *value; - char *buffer_ptr; int values_num = 0; int status; @@ -331,15 +617,17 @@ static int handle_request_update (int fd, /* {{{ */ now = time (NULL); - buffer_ptr = buffer; - - file = buffer_ptr; - buffer_ptr += strlen (file) + 1; + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name."); + return (-1); + } pthread_mutex_lock (&cache_lock); ci = g_tree_lookup (cache_tree, file); - if (ci == NULL) + if (ci == NULL) /* {{{ */ { ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) @@ -368,15 +656,20 @@ static int handle_request_update (int fd, /* {{{ */ RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", ci->file); - } + } /* }}} */ assert (ci != NULL); - while (*buffer_ptr != 0) + while (buffer_size > 0) { char **temp; + char *value; - value = buffer_ptr; - buffer_ptr += strlen (value) + 1; + status = buffer_get_field (&buffer, &buffer_size, &value); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field."); + break; + } temp = (char **) realloc (ci->values, sizeof (char *) * (ci->values_num + 1)); @@ -399,9 +692,10 @@ static int handle_request_update (int fd, /* {{{ */ } if (((now - ci->last_flush_time) >= config_write_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) { - enqueue_cache_item (ci); + enqueue_cache_item (ci, TAIL); pthread_cond_signal (&cache_cond); } @@ -410,7 +704,7 @@ static int handle_request_update (int fd, /* {{{ */ snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); answer[sizeof (answer) - 1] = 0; - status = write (fd, answer, sizeof (answer)); + status = write (fd, answer, strlen (answer)); if (status < 0) { status = errno; @@ -424,29 +718,57 @@ static int handle_request_update (int fd, /* {{{ */ static int handle_request (int fd) /* {{{ */ { char buffer[4096]; - int buffer_size; + size_t buffer_size; + char *buffer_ptr; + char *command; + int status; - buffer_size = read (fd, buffer, sizeof (buffer)); - if (buffer_size < 1) + status = read (fd, buffer, sizeof (buffer)); + if (status == 0) + { + return (1); + } + else if (status < 0) { RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); return (-1); } + buffer_size = status; assert (((size_t) buffer_size) <= sizeof (buffer)); - if ((buffer[buffer_size - 2] != 0) - || (buffer[buffer_size - 1] != 0)) + if (buffer[buffer_size - 1] != '\n') { RRDD_LOG (LOG_INFO, "handle_request: malformed request."); return (-1); } - /* fields in the buffer a separated by null bytes. */ - if (strcmp (buffer, "update") == 0) + /* Accept Windows style line endings, too */ + if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r')) + { + buffer_size--; + buffer[buffer_size - 1] = '\n'; + } + + /* Place the normal field separator at the end to simplify + * `buffer_get_field's work. */ + buffer[buffer_size - 1] = ' '; + + buffer_ptr = buffer; + command = NULL; + status = buffer_get_field (&buffer_ptr, &buffer_size, &command); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request: Unable parse command."); + return (-1); + } + + if (strcmp (command, "update") == 0) + { + return (handle_request_update (fd, buffer_ptr, buffer_size)); + } + else if (strcmp (command, "flush") == 0) { - int offset = strlen ("update") + 1; - return (handle_request_update (fd, buffer + offset, - buffer_size - offset)); + return (handle_request_flush (fd, buffer_ptr, buffer_size)); } else { @@ -628,7 +950,7 @@ static int open_listen_socket (const char *addr) /* {{{ */ ai_hints.ai_socktype = SOCK_STREAM; ai_res = NULL; - status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res); + status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); if (status != 0) { RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: " @@ -716,7 +1038,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } if (config_listen_address_list_len < 1) - open_listen_socket (RRDD_SOCK_PATH); + open_listen_socket (RRDCACHED_DEFAULT_ADDRESS); if (listen_fds_num < 1) { @@ -825,6 +1147,13 @@ static int daemonize (void) /* {{{ */ { pid_t child; int status; + char *base_dir; + + /* These structures are static, because `sigaction' behaves weird if the are + * overwritten.. */ + static struct sigaction sa_int; + static struct sigaction sa_term; + static struct sigaction sa_pipe; child = fork (); if (child < 0) @@ -838,7 +1167,15 @@ static int daemonize (void) /* {{{ */ } /* Change into the /tmp directory. */ - chdir ("/tmp"); + base_dir = (config_base_dir != NULL) + ? config_base_dir + : "/tmp"; + status = chdir (base_dir); + if (status != 0) + { + fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir); + return (-1); + } /* Become session leader */ setsid (); @@ -852,23 +1189,20 @@ static int daemonize (void) /* {{{ */ dup (0); dup (0); - { - struct sigaction sa; - - memset (&sa, 0, sizeof (sa)); - sa.sa_handler = sig_int_handler; - sigaction (SIGINT, &sa, NULL); + /* Install signal handlers */ + memset (&sa_int, 0, sizeof (sa_int)); + sa_int.sa_handler = sig_int_handler; + sigaction (SIGINT, &sa_int, NULL); - memset (&sa, 0, sizeof (sa)); - sa.sa_handler = sig_term_handler; - sigaction (SIGINT, &sa, NULL); + memset (&sa_term, 0, sizeof (sa_term)); + sa_term.sa_handler = sig_term_handler; + sigaction (SIGINT, &sa_term, NULL); - memset (&sa, 0, sizeof (sa)); - sa.sa_handler = SIG_IGN; - sigaction (SIGPIPE, &sa, NULL); - } + memset (&sa_pipe, 0, sizeof (sa_pipe)); + sa_pipe.sa_handler = SIG_IGN; + sigaction (SIGPIPE, &sa_pipe, NULL); - openlog ("rrdd", LOG_PID, LOG_DAEMON); + openlog ("rrdcached", LOG_PID, LOG_DAEMON); cache_tree = g_tree_new ((GCompareFunc) strcmp); if (cache_tree == NULL) @@ -886,6 +1220,8 @@ static int daemonize (void) /* {{{ */ return (-1); } + write_pidfile (); + return (0); } /* }}} int daemonize */ @@ -900,6 +1236,8 @@ static int cleanup (void) /* {{{ */ pthread_join (queue_thread, /* return = */ NULL); RRDD_LOG (LOG_DEBUG, "cleanup: done"); + remove_pidfile (); + closelog (); return (0); @@ -910,7 +1248,7 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "l:f:w:h?")) != -1) + while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1) { switch (option) { @@ -967,20 +1305,63 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'b': + { + size_t len; + + if (config_base_dir != NULL) + free (config_base_dir); + config_base_dir = strdup (optarg); + if (config_base_dir == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (3); + } + + len = strlen (config_base_dir); + while ((len > 0) && (config_base_dir[len - 1] == '/')) + { + config_base_dir[len - 1] = 0; + len--; + } + + if (len < 1) + { + fprintf (stderr, "Invalid base directory: %s\n", optarg); + return (4); + } + } + break; + + case 'p': + { + if (config_pid_file != NULL) + free (config_pid_file); + config_pid_file = strdup (optarg); + if (config_pid_file == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (3); + } + } + break; + case 'h': case '?': printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" "\n" - "Usage: rrdd [options]\n" + "Usage: rrdcached [options]\n" "\n" "Valid options are:\n" " -l
Socket address to listen to.\n" " -w Interval in which to write data.\n" " -f Interval in which to flush dead data.\n" + " -p Location of the PID-file.\n" + " -b Base directory to change to.\n" "\n" "For more information and a detailed description of all options " "please refer\n" - "to the rrdd(1) manual page.\n", + "to the rrdcached(1) manual page.\n", VERSION); status = -1; break;