X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=dcca55d755f53d52513d3e1ba1f9c3a08e5d76bd;hp=5c47ac1fd7b3908edce814a15631c1768e571442;hb=18ff159c38b7d919bafbdcc46d03c3f83c17476f;hpb=001e472141c96137de1b5267f8de7fb91e204ab7 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 5c47ac1..dcca55d 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -62,23 +62,37 @@ /* * Now for some includes.. */ -#include "rrd.h" /* {{{ */ +/* {{{ */ +#if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H) +#include "../win32/config.h" +#else +#ifdef HAVE_CONFIG_H +#include "../rrd_config.h" +#endif +#endif + +#include "rrd.h" #include "rrd_client.h" #include + +#ifndef WIN32 #include -#include #include -#include #include -#include #include +# include + +#else + +#endif +#include +#include #include #include #include #include -#include #include #include #include @@ -130,6 +144,31 @@ struct listen_socket_s }; typedef struct listen_socket_s listen_socket_t; +struct command; +/* note: guard against "unused" warnings in the handlers */ +#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ + time_t now __attribute__((unused)),\ + char *buffer __attribute__((unused)),\ + size_t buffer_size __attribute__((unused)) + +#define HANDLER_PROTO struct command *cmd __attribute__((unused)),\ + DISPATCH_PROTO + +struct command { + char *cmd; + int (*handler)(HANDLER_PROTO); + socket_privilege min_priv; + + char context; /* where we expect to see it */ +#define CMD_CONTEXT_CLIENT (1<<0) +#define CMD_CONTEXT_BATCH (1<<1) +#define CMD_CONTEXT_JOURNAL (1<<2) +#define CMD_CONTEXT_ANY (0x7f) + + char *syntax; + char *help; +}; + struct cache_item_s; typedef struct cache_item_s cache_item_t; struct cache_item_s @@ -178,10 +217,15 @@ static size_t listen_fds_num = 0; static int do_shutdown = 0; -static pthread_t queue_thread; +static pthread_t *queue_threads; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; +static int config_queue_threads = 4; + +static pthread_t flush_thread; +static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *connection_threads = NULL; static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER; static int connection_threads_num = 0; /* Cache stuff */ @@ -189,7 +233,6 @@ static GTree *cache_tree = NULL; static cache_item_t *cache_queue_head = NULL; static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; static int config_write_jitter = 0; @@ -221,6 +264,9 @@ static int journal_write(char *cmd, char *args); static void journal_done(void); static void journal_rotate(void); +/* prototypes for forward refernces */ +static int handle_request_help (HANDLER_PROTO); + /* * Functions */ @@ -228,7 +274,8 @@ static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); do_shutdown++; - pthread_cond_broadcast(&cache_cond); + pthread_cond_broadcast(&flush_cond); + pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ @@ -286,7 +333,7 @@ static void install_signal_handlers(void) /* {{{ */ } /* }}} void install_signal_handlers */ -static int open_pidfile(void) /* {{{ */ +static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; char *file; @@ -295,14 +342,52 @@ static int open_pidfile(void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) - fprintf(stderr, "FATAL: cannot create '%s' (%s)\n", - file, rrd_strerror(errno)); + fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", + action, file, rrd_strerror(errno)); return(fd); } /* }}} static int open_pidfile */ +/* check existing pid file to see whether a daemon is running */ +static int check_pidfile(void) +{ + int pid_fd; + pid_t pid; + char pid_str[16]; + + pid_fd = open_pidfile("open", O_RDWR); + if (pid_fd < 0) + return pid_fd; + + if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0) + return -1; + + pid = atoi(pid_str); + if (pid <= 0) + return -1; + + /* another running process that we can signal COULD be + * a competing rrdcached */ + if (pid != getpid() && kill(pid, 0) == 0) + { + fprintf(stderr, + "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid); + close(pid_fd); + return -1; + } + + lseek(pid_fd, 0, SEEK_SET); + ftruncate(pid_fd, 0); + + fprintf(stderr, + "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" + "rrdcached: starting normally.\n", pid); + + return pid_fd; +} /* }}} static int check_pidfile */ + static int write_pidfile (int fd) /* {{{ */ { pid_t pid; @@ -382,7 +467,7 @@ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ assert(sock != NULL); - new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1); + new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1); if (new_buf == NULL) { RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed"); @@ -409,7 +494,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ va_start(argp, fmt); #ifdef HAVE_VSNPRINTF - len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp); + len = vsnprintf(buffer, sizeof(buffer), fmt, argp); #else len = vsprintf(buffer, fmt, argp); #endif @@ -467,7 +552,7 @@ static int send_response (listen_socket_t *sock, response_code rc, rclen = sprintf(buffer, "%d ", lines); va_start(argp, fmt); #ifdef HAVE_VSNPRINTF - len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp); + len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp); #else len = vsprintf(buffer+rclen, fmt, argp); #endif @@ -526,6 +611,7 @@ static void wipe_ci_values(cache_item_t *ci, time_t when) static void remove_from_queue(cache_item_t *ci) /* {{{ */ { if (ci == NULL) return; + if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */ if (ci->prev == NULL) cache_queue_head = ci->next; /* reset head */ @@ -539,20 +625,21 @@ static void remove_from_queue(cache_item_t *ci) /* {{{ */ ci->next = ci->prev = NULL; ci->flags &= ~CI_FLAGS_IN_QUEUE; + + pthread_mutex_lock (&stats_lock); + assert (stats_queue_length > 0); + stats_queue_length--; + pthread_mutex_unlock (&stats_lock); + } /* }}} static void remove_from_queue */ -/* remove an entry from the tree and free all its resources. - * must hold 'cache lock' while calling this. - * returns 0 on success, otherwise errno */ -static int forget_file(const char *file) +/* free the resources associated with the cache_item_t + * must hold cache_lock when calling this function + */ +static void *free_cache_item(cache_item_t *ci) /* {{{ */ { - cache_item_t *ci; + if (ci == NULL) return NULL; - ci = g_tree_lookup(cache_tree, file); - if (ci == NULL) - return ENOENT; - - g_tree_remove (cache_tree, file); remove_from_queue(ci); for (int i=0; i < ci->values_num; i++) @@ -566,8 +653,8 @@ static int forget_file(const char *file) free (ci); - return 0; -} /* }}} static int forget_file */ + return NULL; +} /* }}} static void *free_cache_item */ /* * enqueue_cache_item: @@ -587,9 +674,8 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ if (cache_queue_head == ci) return 0; - /* remove from the double linked list */ - if (ci->flags & CI_FLAGS_IN_QUEUE) - remove_from_queue(ci); + /* remove if further down in queue */ + remove_from_queue(ci); ci->prev = NULL; ci->next = cache_queue_head; @@ -621,7 +707,7 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ ci->flags |= CI_FLAGS_IN_QUEUE; - pthread_cond_broadcast(&cache_cond); + pthread_cond_signal(&queue_cond); pthread_mutex_lock (&stats_lock); stats_queue_length++; pthread_mutex_unlock (&stats_lock); @@ -631,7 +717,7 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ /* * tree_callback_flush: - * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held * while this is in progress. */ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ @@ -643,25 +729,25 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ ci = (cache_item_t *) value; cfd = (callback_flush_data_t *) data; + if (ci->flags & CI_FLAGS_IN_QUEUE) + return FALSE; + if ((ci->last_flush_time <= cfd->abs_timeout) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num > 0)) { enqueue_cache_item (ci, TAIL); } else if ((do_shutdown != 0) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num > 0)) { enqueue_cache_item (ci, TAIL); } else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num <= 0)) { char **temp; - temp = (char **) realloc (cfd->keys, + temp = (char **) rrd_realloc (cfd->keys, sizeof (char *) * (cfd->keys_num + 1)); if (temp == NULL) { @@ -705,7 +791,7 @@ static int flush_old_values (int max_age) { /* should never fail, since we have held the cache_lock * the entire time */ - assert( forget_file(cfd.keys[k]) == 0 ); + assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); } if (cfd.keys != NULL) @@ -717,27 +803,20 @@ static int flush_old_values (int max_age) return (0); } /* int flush_old_values */ -static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ { struct timeval now; struct timespec next_flush; - int final_flush = 0; /* make sure we only flush once on shutdown */ + int status; gettimeofday (&now, NULL); next_flush.tv_sec = now.tv_sec + config_flush_interval; next_flush.tv_nsec = 1000 * now.tv_usec; - pthread_mutex_lock (&cache_lock); - while ((do_shutdown == 0) || (cache_queue_head != NULL)) - { - cache_item_t *ci; - char *file; - char **values; - int values_num; - int status; - int i; + pthread_mutex_lock(&cache_lock); - /* First, check if it's time to do the cache flush. */ + while (!do_shutdown) + { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) || ((now.tv_sec == next_flush.tv_sec) @@ -758,28 +837,48 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); } + status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush); + if (status != 0 && status != ETIMEDOUT) + { + RRDD_LOG (LOG_ERR, "flush_thread_main: " + "pthread_cond_timedwait returned %i.", status); + } + } + + if (config_flush_at_shutdown) + flush_old_values (-1); /* flush everything */ + + pthread_mutex_unlock(&cache_lock); + + return NULL; +} /* void *flush_thread_main */ + +static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + pthread_mutex_lock (&cache_lock); + + while (!do_shutdown + || (cache_queue_head != NULL && config_flush_at_shutdown)) + { + cache_item_t *ci; + char *file; + char **values; + int values_num; + int status; + int i; + /* Now, check if there's something to store away. If not, wait until - * something comes in or it's time to do the cache flush. if we are - * shutting down, do not wait around. */ + * something comes in. if we are shutting down, do not wait around. */ if (cache_queue_head == NULL && !do_shutdown) { - status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush); + status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) { RRDD_LOG (LOG_ERR, "queue_thread_main: " - "pthread_cond_timedwait returned %i.", status); + "pthread_cond_wait returned %i.", status); } } - /* We're about to shut down */ - if (do_shutdown != 0 && !final_flush++) - { - if (config_flush_at_shutdown) - flush_old_values (-1); /* flush everything */ - else - break; - } - /* Check if a value has arrived. This may be NULL if we timed out or there * was an interrupt such as a signal. */ if (cache_queue_head == NULL) @@ -804,11 +903,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ wipe_ci_values(ci, time(NULL)); remove_from_queue(ci); - pthread_mutex_lock (&stats_lock); - assert (stats_queue_length > 0); - stats_queue_length--; - pthread_mutex_unlock (&stats_lock); - pthread_mutex_unlock (&cache_lock); rrd_clear_error (); @@ -838,25 +932,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } pthread_mutex_lock (&cache_lock); - - /* We're about to shut down */ - if (do_shutdown != 0 && !final_flush++) - { - if (config_flush_at_shutdown) - flush_old_values (-1); /* flush everything */ - else - break; - } - } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ - pthread_mutex_unlock (&cache_lock); - - if (config_flush_at_shutdown) - { - assert(cache_queue_head == NULL); - RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); } - - journal_done(); + pthread_mutex_unlock (&cache_lock); return (NULL); } /* }}} void *queue_thread_main */ @@ -956,6 +1033,26 @@ err: return 0; } /* }}} static int check_file_access */ +/* when using a base dir, convert relative paths to absolute paths. + * if necessary, modifies the "filename" pointer to point + * to the new path created in "tmp". "tmp" is provided + * by the caller and sizeof(tmp) must be >= PATH_MAX. + * + * this allows us to optimize for the expected case (absolute path) + * with a no-op. + */ +static void get_abs_path(char **filename, char *tmp) +{ + assert(tmp != NULL); + assert(filename != NULL && *filename != NULL); + + if (config_base_dir == NULL || **filename == '/') + return; + + snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename); + *filename = tmp; +} /* }}} static int get_abs_path */ + /* returns 1 if we have the required privilege level, * otherwise issue an error to the user on sock */ static int has_privilege (listen_socket_t *sock, /* {{{ */ @@ -998,141 +1095,17 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ -static int handle_request_help (listen_socket_t *sock, /* {{{ */ - char *buffer, size_t buffer_size) +static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */ { - int status; - char **help_text; - char *command; - - char *help_help[2] = - { - "Command overview\n" - , - "HELP []\n" - "FLUSH \n" - "FLUSHALL\n" - "PENDING \n" - "FORGET \n" - "UPDATE [ ...]\n" - "BATCH\n" - "STATS\n" - }; - - char *help_flush[2] = - { - "Help for FLUSH\n" - , - "Usage: FLUSH \n" - "\n" - "Adds the given filename to the head of the update queue and returns\n" - "after is has been dequeued.\n" - }; - - char *help_flushall[2] = - { - "Help for FLUSHALL\n" - , - "Usage: FLUSHALL\n" - "\n" - "Triggers writing of all pending updates. Returns immediately.\n" - }; - - char *help_pending[2] = - { - "Help for PENDING\n" - , - "Usage: PENDING \n" - "\n" - "Shows any 'pending' updates for a file, in order.\n" - "The updates shown have not yet been written to the underlying RRD file.\n" - }; - - char *help_forget[2] = - { - "Help for FORGET\n" - , - "Usage: FORGET \n" - "\n" - "Removes the file completely from the cache.\n" - "Any pending updates for the file will be lost.\n" - }; - - char *help_update[2] = - { - "Help for UPDATE\n" - , - "Usage: UPDATE [ ...]\n" - "\n" - "Adds the given file to the internal cache if it is not yet known and\n" - "appends the given value(s) to the entry. See the rrdcached(1) manpage\n" - "for details.\n" - "\n" - "Each has the following form:\n" - " =