X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=a0b836487f8e9c3d3be5b5258cdd9042c3df459b;hp=0816526bd43a808e549dc4c68b1b4f04aa7dbf82;hb=afdd543a34762dba360a103e9b17f38b64be83ff;hpb=a12627275ff8487174cbb907a066f62a00b6ae44 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 0816526..a0b8364 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -119,7 +119,7 @@ struct cache_item_s #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) int flags; - + pthread_cond_t flushed; cache_item_t *next; }; @@ -165,8 +165,6 @@ static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; -static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; - static int config_write_interval = 300; static int config_write_jitter = 0; static int config_flush_interval = 3600; @@ -197,45 +195,74 @@ static void journal_rotate(void); /* * Functions */ -static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_common (const char *sig) /* {{{ */ { - RRDD_LOG(LOG_NOTICE, "caught SIGINT"); + RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); do_shutdown++; pthread_cond_broadcast(&cache_cond); +} /* }}} void sig_common */ + +static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +{ + sig_common("INT"); } /* }}} void sig_int_handler */ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ { - RRDD_LOG(LOG_NOTICE, "caught SIGTERM"); - do_shutdown++; - pthread_cond_broadcast(&cache_cond); + sig_common("TERM"); } /* }}} void sig_term_handler */ -static int write_pidfile (void) /* {{{ */ +static void install_signal_handlers(void) /* {{{ */ +{ + /* These structures are static, because `sigaction' behaves weird if the are + * overwritten.. */ + static struct sigaction sa_int; + static struct sigaction sa_term; + static struct sigaction sa_pipe; + + /* Install signal handlers */ + memset (&sa_int, 0, sizeof (sa_int)); + sa_int.sa_handler = sig_int_handler; + sigaction (SIGINT, &sa_int, NULL); + + memset (&sa_term, 0, sizeof (sa_term)); + sa_term.sa_handler = sig_term_handler; + sigaction (SIGTERM, &sa_term, NULL); + + memset (&sa_pipe, 0, sizeof (sa_pipe)); + sa_pipe.sa_handler = SIG_IGN; + sigaction (SIGPIPE, &sa_pipe, NULL); + +} /* }}} void install_signal_handlers */ + +static int open_pidfile(void) /* {{{ */ { - pid_t pid; - char *file; int fd; - FILE *fh; + char *file; - pid = getpid (); - file = (config_pid_file != NULL) ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) - { - RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", - file, rrd_strerror(errno)); - return (-1); - } + fprintf(stderr, "FATAL: cannot create '%s' (%s)\n", + file, rrd_strerror(errno)); + + return(fd); +} + +static int write_pidfile (int fd) /* {{{ */ +{ + pid_t pid; + FILE *fh; + + pid = getpid (); fh = fdopen (fd, "w"); if (fh == NULL) { - RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed."); close(fd); return (-1); } @@ -427,6 +454,7 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ if (did_insert) { + pthread_cond_broadcast(&cache_cond); pthread_mutex_lock (&stats_lock); stats_queue_length++; pthread_mutex_unlock (&stats_lock); @@ -499,7 +527,7 @@ static int flush_old_values (int max_age) if (max_age > 0) cfd.abs_timeout = cfd.now - max_age; else - cfd.abs_timeout = cfd.now + 1; + cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1; /* `tree_callback_flush' will return the keys of all values that haven't * been touched in the last `config_flush_interval' seconds in `cfd'. @@ -640,6 +668,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } journal_write("wrote", file); + pthread_cond_broadcast(&ci->flushed); for (i = 0; i < values_num; i++) free (values[i]); @@ -656,7 +685,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } pthread_mutex_lock (&cache_lock); - pthread_cond_broadcast (&flush_cond); /* We're about to shut down, so lets flush the entire tree. */ if ((do_shutdown != 0) && (cache_queue_head == NULL)) @@ -749,25 +777,10 @@ static int flush_file (const char *filename) /* {{{ */ /* Enqueue at head */ enqueue_cache_item (ci, HEAD); - pthread_cond_signal (&cache_cond); - - while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) - { - ci = NULL; - - pthread_cond_wait (&flush_cond, &cache_lock); - ci = g_tree_lookup (cache_tree, filename); - if (ci == NULL) - { - RRDD_LOG (LOG_ERR, "flush_file: Tree node went away " - "while waiting for flush."); - pthread_mutex_unlock (&cache_lock); - return (-1); - } - } + pthread_cond_wait(&ci->flushed, &cache_lock); + pthread_mutex_unlock(&cache_lock); - pthread_mutex_unlock (&cache_lock); return (0); } /* }}} int flush_file */ @@ -782,8 +795,9 @@ static int handle_request_help (int fd, /* {{{ */ char *help_help[] = { - "4 Command overview\n", + "5 Command overview\n", "FLUSH \n", + "FLUSHALL\n", "HELP []\n", "UPDATE [ ...]\n", "STATS\n" @@ -800,6 +814,15 @@ static int handle_request_help (int fd, /* {{{ */ }; size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); + char *help_flushall[] = + { + "3 Help for FLUSHALL\n", + "Usage: FLUSHALL\n", + "\n", + "Triggers writing of all pending updates. Returns immediately.\n" + }; + size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]); + char *help_update[] = { "9 Help for UPDATE\n", @@ -843,6 +866,11 @@ static int handle_request_help (int fd, /* {{{ */ help_text = help_flush; help_text_len = help_flush_len; } + else if (strcasecmp (command, "flushall") == 0) + { + help_text = help_flushall; + help_text_len = help_flushall_len; + } else if (strcasecmp (command, "stats") == 0) { help_text = help_stats; @@ -1005,6 +1033,27 @@ static int handle_request_flush (int fd, /* {{{ */ return (0); } /* }}} int handle_request_flush */ +static int handle_request_flushall(int fd) /* {{{ */ +{ + int status; + char answer[] ="0 Started flush.\n"; + + RRDD_LOG(LOG_DEBUG, "Received FLUSHALL"); + + pthread_mutex_lock(&cache_lock); + flush_old_values(-1); + pthread_mutex_unlock(&cache_lock); + + status = swrite(fd, answer, strlen(answer)); + if (status < 0) + { + status = errno; + RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error."); + } + + return (status); +} + static int handle_request_update (int fd, /* {{{ */ char *buffer, size_t buffer_size) { @@ -1043,17 +1092,19 @@ static int handle_request_update (int fd, /* {{{ */ pthread_mutex_unlock(&stats_lock); pthread_mutex_lock (&cache_lock); - ci = g_tree_lookup (cache_tree, file); + if (ci == NULL) /* {{{ */ { struct stat statbuf; + /* don't hold the lock while we setup; stat(2) might block */ + pthread_mutex_unlock(&cache_lock); + memset (&statbuf, 0, sizeof (statbuf)); status = stat (file, &statbuf); if (status != 0) { - pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); status = errno; @@ -1067,16 +1118,12 @@ static int handle_request_update (int fd, /* {{{ */ } if (!S_ISREG (statbuf.st_mode)) { - pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file); RRDD_UPDATE_SEND; return (0); } if (access(file, R_OK|W_OK) != 0) { - pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n", file, rrd_strerror(errno)); RRDD_UPDATE_SEND; @@ -1086,7 +1133,6 @@ static int handle_request_update (int fd, /* {{{ */ ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) { - pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); strncpy (answer, "-1 malloc failed.\n", sizeof (answer)); @@ -1098,7 +1144,6 @@ static int handle_request_update (int fd, /* {{{ */ ci->file = strdup (file); if (ci->file == NULL) { - pthread_mutex_unlock (&cache_lock); free (ci); RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); @@ -1110,6 +1155,7 @@ static int handle_request_update (int fd, /* {{{ */ _wipe_ci_values(ci, now); ci->flags = CI_FLAGS_IN_TREE; + pthread_mutex_lock(&cache_lock); g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); } /* }}} */ assert (ci != NULL); @@ -1151,7 +1197,6 @@ static int handle_request_update (int fd, /* {{{ */ && (ci->values_num > 0)) { enqueue_cache_item (ci, TAIL); - pthread_cond_signal (&cache_cond); } pthread_mutex_unlock (&cache_lock); @@ -1239,6 +1284,10 @@ static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */ { return (handle_request_flush (fd, buffer_ptr, buffer_size)); } + else if (strcasecmp (command, "flushall") == 0) + { + return (handle_request_flushall(fd)); + } else if (strcasecmp (command, "stats") == 0) { return (handle_request_stats (fd, buffer_ptr, buffer_size)); @@ -1479,12 +1528,11 @@ static void *connection_thread_main (void *args) /* {{{ */ status = handle_request (fd, buffer, /*buffer_size=*/ status); if (status != 0) - { - close (fd); break; - } } + close(fd); + self = pthread_self (); /* Remove this thread from the connection threads list */ pthread_mutex_lock (&connection_threads_lock); @@ -1563,14 +1611,21 @@ static int open_listen_socket_unix (const char *path) /* {{{ */ return (0); } /* }}} int open_listen_socket_unix */ -static int open_listen_socket (const char *addr) /* {{{ */ +static int open_listen_socket (const char *addr_orig) /* {{{ */ { struct addrinfo ai_hints; struct addrinfo *ai_res; struct addrinfo *ai_ptr; + char addr_copy[NI_MAXHOST]; + char *addr; + char *port; int status; - assert (addr != NULL); + assert (addr_orig != NULL); + + strncpy (addr_copy, addr_orig, sizeof (addr_copy)); + addr_copy[sizeof (addr_copy) - 1] = 0; + addr = addr_copy; if (strncmp ("unix:", addr, strlen ("unix:")) == 0) return (open_listen_socket_unix (addr + strlen ("unix:"))); @@ -1585,8 +1640,46 @@ static int open_listen_socket (const char *addr) /* {{{ */ ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_STREAM; + port = NULL; + if (*addr == '[') /* IPv6+port format */ + { + /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */ + addr++; + + port = strchr (addr, ']'); + if (port == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s", + addr_orig); + return (-1); + } + *port = 0; + port++; + + if (*port == ':') + port++; + else if (*port == 0) + port = NULL; + else + { + RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s", + port); + return (-1); + } + } /* if (*addr = ']') */ + else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */ + { + port = rindex(addr, ':'); + if (port != NULL) + { + *port = 0; + port++; + } + } ai_res = NULL; - status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); + status = getaddrinfo (addr, + port == NULL ? RRDCACHED_DEFAULT_PORT : port, + &ai_hints, &ai_res); if (status != 0) { RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: " @@ -1598,6 +1691,7 @@ static int open_listen_socket (const char *addr) /* {{{ */ { int fd; listen_socket_t *temp; + int one = 1; temp = (listen_socket_t *) realloc (listen_fds, sizeof (listen_fds[0]) * (listen_fds_num + 1)); @@ -1616,6 +1710,8 @@ static int open_listen_socket (const char *addr) /* {{{ */ continue; } + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); if (status != 0) { @@ -1700,8 +1796,12 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ pollfds[i].revents = 0; } - status = poll (pollfds, pollfds_num, /* timeout = */ -1); - if (status < 1) + status = poll (pollfds, pollfds_num, /* timeout = */ 1000); + if (status == 0) + { + continue; /* timeout */ + } + else if (status < 0) { status = errno; if (status != EINTR) @@ -1784,12 +1884,10 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ static int daemonize (void) /* {{{ */ { int status; + int fd; - /* These structures are static, because `sigaction' behaves weird if the are - * overwritten.. */ - static struct sigaction sa_int; - static struct sigaction sa_term; - static struct sigaction sa_pipe; + fd = open_pidfile(); + if (fd < 0) return fd; if (!stay_foreground) { @@ -1831,18 +1929,7 @@ static int daemonize (void) /* {{{ */ dup (0); } /* if (!stay_foreground) */ - /* Install signal handlers */ - memset (&sa_int, 0, sizeof (sa_int)); - sa_int.sa_handler = sig_int_handler; - sigaction (SIGINT, &sa_int, NULL); - - memset (&sa_term, 0, sizeof (sa_term)); - sa_term.sa_handler = sig_term_handler; - sigaction (SIGTERM, &sa_term, NULL); - - memset (&sa_pipe, 0, sizeof (sa_pipe)); - sa_pipe.sa_handler = SIG_IGN; - sigaction (SIGPIPE, &sa_pipe, NULL); + install_signal_handlers(); openlog ("rrdcached", LOG_PID, LOG_DAEMON); RRDD_LOG(LOG_INFO, "starting up"); @@ -1854,7 +1941,7 @@ static int daemonize (void) /* {{{ */ return (-1); } - status = write_pidfile (); + status = write_pidfile (fd); return status; } /* }}} int daemonize */ @@ -2033,17 +2120,19 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'h': case '?': - printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" + printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n" "\n" "Usage: rrdcached [options]\n" "\n" "Valid options are:\n" " -l
Socket address to listen to.\n" " -w Interval in which to write data.\n" - " -z Delay writes up to seconds to spread load" \ + " -z Delay writes up to seconds to spread load\n" " -f Interval in which to flush dead data.\n" " -p Location of the PID-file.\n" " -b Base directory to change to.\n" + " -g Do not fork and run in the foreground.\n" + " -j Directory in which to create the journal files.\n" "\n" "For more information and a detailed description of all options " "please refer\n"