X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=f1b8f8bb5eb455fbc05e0b195569ab988cd5c45e;hb=538086ef0bac2d41ec6a00177c2028baae265daa;hp=6f01f899cb8e03629257d4ed757ab63854af1753;hpb=e2d66d6782fe88a3900908c76b3a6fd7affbaea0;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 6f01f89..f1b8f8b 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -119,7 +119,7 @@ struct cache_item_s #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) int flags; - + pthread_cond_t flushed; cache_item_t *next; }; @@ -165,8 +165,6 @@ static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; -static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; - static int config_write_interval = 300; static int config_write_jitter = 0; static int config_flush_interval = 3600; @@ -211,31 +209,34 @@ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ pthread_cond_broadcast(&cache_cond); } /* }}} void sig_term_handler */ -static int write_pidfile (void) /* {{{ */ +static int open_pidfile(void) /* {{{ */ { - pid_t pid; - char *file; int fd; - FILE *fh; + char *file; - pid = getpid (); - file = (config_pid_file != NULL) ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) - { - RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", - file, rrd_strerror(errno)); - return (-1); - } + fprintf(stderr, "FATAL: cannot create '%s' (%s)\n", + file, rrd_strerror(errno)); + + return(fd); +} + +static int write_pidfile (int fd) /* {{{ */ +{ + pid_t pid; + FILE *fh; + + pid = getpid (); fh = fdopen (fd, "w"); if (fh == NULL) { - RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed."); close(fd); return (-1); } @@ -499,7 +500,7 @@ static int flush_old_values (int max_age) if (max_age > 0) cfd.abs_timeout = cfd.now - max_age; else - cfd.abs_timeout = cfd.now + 1; + cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1; /* `tree_callback_flush' will return the keys of all values that haven't * been touched in the last `config_flush_interval' seconds in `cfd'. @@ -640,6 +641,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } journal_write("wrote", file); + pthread_cond_broadcast(&ci->flushed); for (i = 0; i < values_num; i++) free (values[i]); @@ -656,7 +658,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } pthread_mutex_lock (&cache_lock); - pthread_cond_broadcast (&flush_cond); /* We're about to shut down, so lets flush the entire tree. */ if ((do_shutdown != 0) && (cache_queue_head == NULL)) @@ -751,23 +752,9 @@ static int flush_file (const char *filename) /* {{{ */ enqueue_cache_item (ci, HEAD); pthread_cond_signal (&cache_cond); - while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) - { - ci = NULL; - - pthread_cond_wait (&flush_cond, &cache_lock); - - ci = g_tree_lookup (cache_tree, filename); - if (ci == NULL) - { - RRDD_LOG (LOG_ERR, "flush_file: Tree node went away " - "while waiting for flush."); - pthread_mutex_unlock (&cache_lock); - return (-1); - } - } + pthread_cond_wait(&ci->flushed, &cache_lock); + pthread_mutex_unlock(&cache_lock); - pthread_mutex_unlock (&cache_lock); return (0); } /* }}} int flush_file */ @@ -1043,17 +1030,19 @@ static int handle_request_update (int fd, /* {{{ */ pthread_mutex_unlock(&stats_lock); pthread_mutex_lock (&cache_lock); - ci = g_tree_lookup (cache_tree, file); + if (ci == NULL) /* {{{ */ { struct stat statbuf; + /* don't hold the lock while we setup; stat(2) might block */ + pthread_mutex_unlock(&cache_lock); + memset (&statbuf, 0, sizeof (statbuf)); status = stat (file, &statbuf); if (status != 0) { - pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); status = errno; @@ -1067,16 +1056,12 @@ static int handle_request_update (int fd, /* {{{ */ } if (!S_ISREG (statbuf.st_mode)) { - pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file); RRDD_UPDATE_SEND; return (0); } if (access(file, R_OK|W_OK) != 0) { - pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n", file, rrd_strerror(errno)); RRDD_UPDATE_SEND; @@ -1086,7 +1071,6 @@ static int handle_request_update (int fd, /* {{{ */ ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) { - pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); strncpy (answer, "-1 malloc failed.\n", sizeof (answer)); @@ -1098,7 +1082,6 @@ static int handle_request_update (int fd, /* {{{ */ ci->file = strdup (file); if (ci->file == NULL) { - pthread_mutex_unlock (&cache_lock); free (ci); RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); @@ -1110,6 +1093,7 @@ static int handle_request_update (int fd, /* {{{ */ _wipe_ci_values(ci, now); ci->flags = CI_FLAGS_IN_TREE; + pthread_mutex_lock(&cache_lock); g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); } /* }}} */ assert (ci != NULL); @@ -1479,12 +1463,11 @@ static void *connection_thread_main (void *args) /* {{{ */ status = handle_request (fd, buffer, /*buffer_size=*/ status); if (status != 0) - { - close (fd); break; - } } + close(fd); + self = pthread_self (); /* Remove this thread from the connection threads list */ pthread_mutex_lock (&connection_threads_lock); @@ -1563,15 +1546,21 @@ static int open_listen_socket_unix (const char *path) /* {{{ */ return (0); } /* }}} int open_listen_socket_unix */ -static int open_listen_socket (const char *addr) /* {{{ */ +static int open_listen_socket (const char *addr_orig) /* {{{ */ { struct addrinfo ai_hints; struct addrinfo *ai_res; struct addrinfo *ai_ptr; + char addr_copy[NI_MAXHOST]; + char *addr; char *port; int status; - assert (addr != NULL); + assert (addr_orig != NULL); + + strncpy (addr_copy, addr_orig, sizeof (addr_copy)); + addr_copy[sizeof (addr_copy) - 1] = 0; + addr = addr_copy; if (strncmp ("unix:", addr, strlen ("unix:")) == 0) return (open_listen_socket_unix (addr + strlen ("unix:"))); @@ -1586,10 +1575,42 @@ static int open_listen_socket (const char *addr) /* {{{ */ ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_STREAM; - port = rindex(addr, ':'); - if (port != NULL) - *port++ = '\0'; + port = NULL; + if (*addr == '[') /* IPv6+port format */ + { + /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */ + addr++; + + port = strchr (addr, ']'); + if (port == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s", + addr_orig); + return (-1); + } + *port = 0; + port++; + if (*port == ':') + port++; + else if (*port == 0) + port = NULL; + else + { + RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s", + port); + return (-1); + } + } /* if (*addr = ']') */ + else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */ + { + port = rindex(addr, ':'); + if (port != NULL) + { + *port = 0; + port++; + } + } ai_res = NULL; status = getaddrinfo (addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port, @@ -1605,6 +1626,7 @@ static int open_listen_socket (const char *addr) /* {{{ */ { int fd; listen_socket_t *temp; + int one = 1; temp = (listen_socket_t *) realloc (listen_fds, sizeof (listen_fds[0]) * (listen_fds_num + 1)); @@ -1623,6 +1645,8 @@ static int open_listen_socket (const char *addr) /* {{{ */ continue; } + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); if (status != 0) { @@ -1707,8 +1731,12 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ pollfds[i].revents = 0; } - status = poll (pollfds, pollfds_num, /* timeout = */ -1); - if (status < 1) + status = poll (pollfds, pollfds_num, /* timeout = */ 1000); + if (status == 0) + { + continue; /* timeout */ + } + else if (status < 0) { status = errno; if (status != EINTR) @@ -1791,6 +1819,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ static int daemonize (void) /* {{{ */ { int status; + int fd; /* These structures are static, because `sigaction' behaves weird if the are * overwritten.. */ @@ -1798,6 +1827,9 @@ static int daemonize (void) /* {{{ */ static struct sigaction sa_term; static struct sigaction sa_pipe; + fd = open_pidfile(); + if (fd < 0) return fd; + if (!stay_foreground) { pid_t child; @@ -1861,7 +1893,7 @@ static int daemonize (void) /* {{{ */ return (-1); } - status = write_pidfile (); + status = write_pidfile (fd); return status; } /* }}} int daemonize */