X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=9fbbc1133af5a79869a53e84657086b91d4f0b60;hp=ea607d83df80810ddecd660e4bd5552cb05f5a22;hb=607eea59c2ba85c63e2de1b665b37a6093e1575c;hpb=814c75ac076932467801c10ff48f8cd6534fde5a diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index ea607d8..9fbbc11 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -117,7 +117,7 @@ struct listen_socket_s socket_privilege privilege; /* state for BATCH processing */ - int batch_mode; + time_t batch_start; int batch_cmd; /* buffered IO */ @@ -138,6 +138,7 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; + time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) int flags; @@ -285,7 +286,7 @@ static void install_signal_handlers(void) /* {{{ */ } /* }}} void install_signal_handlers */ -static int open_pidfile(void) /* {{{ */ +static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; char *file; @@ -294,14 +295,52 @@ static int open_pidfile(void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) - fprintf(stderr, "FATAL: cannot create '%s' (%s)\n", - file, rrd_strerror(errno)); + fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", + action, file, rrd_strerror(errno)); return(fd); } /* }}} static int open_pidfile */ +/* check existing pid file to see whether a daemon is running */ +static int check_pidfile(void) +{ + int pid_fd; + pid_t pid; + char pid_str[16]; + + pid_fd = open_pidfile("open", O_RDWR); + if (pid_fd < 0) + return pid_fd; + + if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0) + return -1; + + pid = atoi(pid_str); + if (pid <= 0) + return -1; + + /* another running process that we can signal COULD be + * a competing rrdcached */ + if (pid != getpid() && kill(pid, 0) == 0) + { + fprintf(stderr, + "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid); + close(pid_fd); + return -1; + } + + lseek(pid_fd, 0, SEEK_SET); + ftruncate(pid_fd, 0); + + fprintf(stderr, + "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" + "rrdcached: starting normally.\n", pid); + + return pid_fd; +} /* }}} static int check_pidfile */ + static int write_pidfile (int fd) /* {{{ */ { pid_t pid; @@ -404,7 +443,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ int len; if (sock == NULL) return 0; /* journal replay mode */ - if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */ + if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); #ifdef HAVE_VSNPRINTF @@ -452,7 +491,7 @@ static int send_response (listen_socket_t *sock, response_code rc, if (sock == NULL) return rc; /* journal replay mode */ - if (sock->batch_mode) + if (sock->batch_start) { if (rc == RESP_OK) return rc; /* no response on success during BATCH */ @@ -477,7 +516,7 @@ static int send_response (listen_socket_t *sock, response_code rc, len += rclen; /* append the result to the wbuf, don't write to the user */ - if (sock->batch_mode) + if (sock->batch_start) return add_to_wbuf(sock, buffer, len); /* first write must be complete */ @@ -487,7 +526,7 @@ static int send_response (listen_socket_t *sock, response_code rc, return -1; } - if (sock->wbuf != NULL) + if (sock->wbuf != NULL && rc == RESP_OK) { wrote = 0; while (wrote < sock->wbuf_len) @@ -540,6 +579,34 @@ static void remove_from_queue(cache_item_t *ci) /* {{{ */ ci->flags &= ~CI_FLAGS_IN_QUEUE; } /* }}} static void remove_from_queue */ +/* remove an entry from the tree and free all its resources. + * must hold 'cache lock' while calling this. + * returns 0 on success, otherwise errno */ +static int forget_file(const char *file) +{ + cache_item_t *ci; + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + return ENOENT; + + g_tree_remove (cache_tree, file); + remove_from_queue(ci); + + for (int i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free (ci->values); + free (ci->file); + + /* in case anyone is waiting */ + pthread_cond_broadcast(&ci->flushed); + + free (ci); + + return 0; +} /* }}} static int forget_file */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -674,26 +741,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { - cache_item_t *ci; - - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ + /* should never fail, since we have held the cache_lock + * the entire time */ + assert( forget_file(cfd.keys[k]) == 0 ); + } if (cfd.keys != NULL) { @@ -735,8 +786,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ flush_old_values (config_write_interval); /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec <= now.tv_sec) - next_flush.tv_sec += config_flush_interval; + next_flush.tv_sec = + now.tv_sec + next_flush.tv_sec % config_flush_interval; /* unlock the cache while we rotate so we don't block incoming * updates if the fsync() blocks on disk I/O */ @@ -943,6 +994,40 @@ err: return 0; } /* }}} static int check_file_access */ +/* when using a base dir, convert relative paths to absolute paths. + * if necessary, modifies the "filename" pointer to point + * to the new path created in "tmp". "tmp" is provided + * by the caller and sizeof(tmp) must be >= PATH_MAX. + * + * this allows us to optimize for the expected case (absolute path) + * with a no-op. + */ +static void get_abs_path(char **filename, char *tmp) +{ + assert(tmp != NULL); + assert(filename != NULL && *filename != NULL); + + if (config_base_dir == NULL || **filename == '/') + return; + + snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename); + *filename = tmp; +} /* }}} static int get_abs_path */ + +/* returns 1 if we have the required privilege level, + * otherwise issue an error to the user on sock */ +static int has_privilege (listen_socket_t *sock, /* {{{ */ + socket_privilege priv) +{ + if (sock == NULL) /* journal replay */ + return 1; + + if (sock->privilege >= priv) + return 1; + + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); +} /* }}} static int has_privilege */ + static int flush_file (const char *filename) /* {{{ */ { cache_item_t *ci; @@ -963,6 +1048,9 @@ static int flush_file (const char *filename) /* {{{ */ pthread_cond_wait(&ci->flushed, &cache_lock); } + /* DO NOT DO ANYTHING WITH ci HERE!! The entry + * may have been purged during our cond_wait() */ + pthread_mutex_unlock(&cache_lock); return (0); @@ -979,9 +1067,11 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ { "Command overview\n" , + "HELP []\n" "FLUSH \n" "FLUSHALL\n" - "HELP []\n" + "PENDING \n" + "FORGET \n" "UPDATE [ ...]\n" "BATCH\n" "STATS\n" @@ -1006,6 +1096,26 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ "Triggers writing of all pending updates. Returns immediately.\n" }; + char *help_pending[2] = + { + "Help for PENDING\n" + , + "Usage: PENDING \n" + "\n" + "Shows any 'pending' updates for a file, in order.\n" + "The updates shown have not yet been written to the underlying RRD file.\n" + }; + + char *help_forget[2] = + { + "Help for FORGET\n" + , + "Usage: FORGET \n" + "\n" + "Removes the file completely from the cache.\n" + "Any pending updates for the file will be lost.\n" + }; + char *help_update[2] = { "Help for UPDATE\n" @@ -1064,6 +1174,10 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ help_text = help_flush; else if (strcasecmp (command, "flushall") == 0) help_text = help_flushall; + else if (strcasecmp (command, "pending") == 0) + help_text = help_pending; + else if (strcasecmp (command, "forget") == 0) + help_text = help_forget; else if (strcasecmp (command, "stats") == 0) help_text = help_stats; else if (strcasecmp (command, "batch") == 0) @@ -1127,7 +1241,7 @@ static int handle_request_stats (listen_socket_t *sock) /* {{{ */ static int handle_request_flush (listen_socket_t *sock, /* {{{ */ char *buffer, size_t buffer_size) { - char *file; + char *file, file_tmp[PATH_MAX]; int status; status = buffer_get_field (&buffer, &buffer_size, &file); @@ -1141,6 +1255,7 @@ static int handle_request_flush (listen_socket_t *sock, /* {{{ */ stats_flush_received++; pthread_mutex_unlock(&stats_lock); + get_abs_path(&file, file_tmp); if (!check_file_access(file, sock)) return 0; status = flush_file (file); @@ -1165,10 +1280,15 @@ static int handle_request_flush (listen_socket_t *sock, /* {{{ */ /* NOTREACHED */ assert(1==0); -} /* }}} int handle_request_slurp */ +} /* }}} int handle_request_flush */ static int handle_request_flushall(listen_socket_t *sock) /* {{{ */ { + int status; + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; RRDD_LOG(LOG_DEBUG, "Received FLUSHALL"); @@ -1179,17 +1299,94 @@ static int handle_request_flushall(listen_socket_t *sock) /* {{{ */ return send_response(sock, RESP_OK, "Started flush.\n"); } /* }}} static int handle_request_flushall */ +static int handle_request_pending(listen_socket_t *sock, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char *file, file_tmp[PATH_MAX]; + cache_item_t *ci; + + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return send_response(sock, RESP_ERR, + "Usage: PENDING \n"); + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + get_abs_path(&file, file_tmp); + + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + { + pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); + } + + for (int i=0; i < ci->values_num; i++) + add_response_info(sock, "%s\n", ci->values[i]); + + pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_OK, "updates pending\n"); +} /* }}} static int handle_request_pending */ + +static int handle_request_forget(listen_socket_t *sock, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char *file, file_tmp[PATH_MAX]; + + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return send_response(sock, RESP_ERR, + "Usage: FORGET \n"); + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) return 0; + + pthread_mutex_lock(&cache_lock); + status = forget_file(file); + pthread_mutex_unlock(&cache_lock); + + if (status == 0) + { + if (sock != NULL) + journal_write("forget", file); + + return send_response(sock, RESP_OK, "Gone!\n"); + } + else + return send_response(sock, RESP_ERR, "cannot forget: %s\n", + status < 0 ? "Internal error" : rrd_strerror(status)); + + /* NOTREACHED */ + assert(1==0); +} /* }}} static int handle_request_forget */ + static int handle_request_update (listen_socket_t *sock, /* {{{ */ - char *buffer, size_t buffer_size) + time_t now, + char *buffer, size_t buffer_size) { - char *file; + char *file, file_tmp[PATH_MAX]; int values_num = 0; + int bad_timestamps = 0; int status; + char orig_buf[CMD_MAX]; - time_t now; cache_item_t *ci; - now = time (NULL); + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + /* save it for the journal later */ + strncpy(orig_buf, buffer, sizeof(orig_buf)-1); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -1200,6 +1397,7 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ stats_updates_received++; pthread_mutex_unlock(&stats_lock); + get_abs_path(&file, file_tmp); if (!check_file_access(file, sock)) return 0; pthread_mutex_lock (&cache_lock); @@ -1258,10 +1456,16 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ } /* }}} */ assert (ci != NULL); + /* don't re-write updates in replay mode */ + if (sock != NULL) + journal_write("update", orig_buf); + while (buffer_size > 0) { char **temp; char *value; + time_t stamp; + char *eostamp; status = buffer_get_field (&buffer, &buffer_size, &value); if (status != 0) @@ -1270,6 +1474,26 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ break; } + /* make sure update time is always moving forward */ + stamp = strtol(value, &eostamp, 10); + if (eostamp == value || eostamp == NULL || *eostamp != ':') + { + ++bad_timestamps; + add_response_info(sock, "Cannot find timestamp in '%s'!\n", value); + continue; + } + else if (stamp <= ci->last_update_stamp) + { + ++bad_timestamps; + add_response_info(sock, + "illegal attempt to update using time %ld when" + " last update time is %ld (minimum one second step)\n", + stamp, ci->last_update_stamp); + continue; + } + else + ci->last_update_stamp = stamp; + temp = (char **) realloc (ci->values, sizeof (char *) * (ci->values_num + 1)); if (temp == NULL) @@ -1300,9 +1524,21 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ pthread_mutex_unlock (&cache_lock); if (values_num < 1) - return send_response(sock, RESP_ERR, "No values updated.\n"); + { + /* if we had only one update attempt, then return the full + error message... try to get the most information out + of the limited error space allowed by the protocol + */ + if (bad_timestamps == 1) + return send_response(sock, RESP_ERR, "%s", sock->wbuf); + else + return send_response(sock, RESP_ERR, + "No values updated (%d bad timestamps).\n", + bad_timestamps); + } else - return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num); + return send_response(sock, RESP_OK, + "errors, enqueued %i value(s).\n", values_num); /* NOTREACHED */ assert(1==0); @@ -1312,7 +1548,7 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ -static int handle_request_wrote (const char *buffer) /* {{{ */ +static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */ { int i; cache_item_t *ci; @@ -1335,7 +1571,7 @@ static int handle_request_wrote (const char *buffer) /* {{{ */ free(ci->values); } - wipe_ci_values(ci, time(NULL)); + wipe_ci_values(ci, now); remove_from_queue(ci); pthread_mutex_unlock(&cache_lock); @@ -1346,12 +1582,12 @@ static int handle_request_wrote (const char *buffer) /* {{{ */ static int batch_start (listen_socket_t *sock) /* {{{ */ { int status; - if (sock->batch_mode) + if (sock->batch_start) return send_response(sock, RESP_ERR, "Already in BATCH\n"); status = send_response(sock, RESP_OK, "Go ahead. End with dot '.' on its own line.\n"); - sock->batch_mode = 1; + sock->batch_start = time(NULL); sock->batch_cmd = 0; return status; @@ -1360,27 +1596,15 @@ static int batch_start (listen_socket_t *sock) /* {{{ */ /* finish "BATCH" processing and return results to the client */ static int batch_done (listen_socket_t *sock) /* {{{ */ { - assert(sock->batch_mode); - sock->batch_mode = 0; + assert(sock->batch_start); + sock->batch_start = 0; sock->batch_cmd = 0; return send_response(sock, RESP_OK, "errors\n"); } /* }}} static int batch_done */ -/* returns 1 if we have the required privilege level */ -static int has_privilege (listen_socket_t *sock, /* {{{ */ - socket_privilege priv) -{ - if (sock == NULL) /* journal replay */ - return 1; - - if (sock->privilege >= priv) - return 1; - - return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); -} /* }}} static int has_privilege */ - /* if sock==NULL, we are in journal replay mode */ static int handle_request (listen_socket_t *sock, /* {{{ */ + time_t now, char *buffer, size_t buffer_size) { char *buffer_ptr; @@ -1398,43 +1622,31 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (-1); } - if (sock != NULL && sock->batch_mode) + if (sock != NULL && sock->batch_start) sock->batch_cmd++; if (strcasecmp (command, "update") == 0) - { - status = has_privilege(sock, PRIV_HIGH); - if (status <= 0) - return status; - - /* don't re-write updates in replay mode */ - if (sock != NULL) - journal_write(command, buffer_ptr); - - return (handle_request_update (sock, buffer_ptr, buffer_size)); - } + return (handle_request_update (sock, now, buffer_ptr, buffer_size)); else if (strcasecmp (command, "wrote") == 0 && sock == NULL) { /* this is only valid in replay mode */ - return (handle_request_wrote (buffer_ptr)); + return (handle_request_wrote (buffer_ptr, now)); } else if (strcasecmp (command, "flush") == 0) return (handle_request_flush (sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "flushall") == 0) - { - status = has_privilege(sock, PRIV_HIGH); - if (status <= 0) - return status; - return (handle_request_flushall(sock)); - } + else if (strcasecmp (command, "pending") == 0) + return (handle_request_pending(sock, buffer_ptr, buffer_size)); + else if (strcasecmp (command, "forget") == 0) + return (handle_request_forget(sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "stats") == 0) return (handle_request_stats (sock)); else if (strcasecmp (command, "help") == 0) return (handle_request_help (sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "batch") == 0 && sock != NULL) return batch_start(sock); - else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode) + else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start) return batch_done(sock); else return send_response(sock, RESP_ERR, "Unknown command: %s\n", command); @@ -1550,6 +1762,7 @@ static int journal_replay (const char *file) /* {{{ */ int fail_cnt = 0; uint64_t line = 0; char entry[CMD_MAX]; + time_t now; if (file == NULL) return 0; @@ -1602,6 +1815,8 @@ static int journal_replay (const char *file) /* {{{ */ else RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file); + now = time(NULL); + while(!feof(fh)) { size_t entry_len; @@ -1623,7 +1838,7 @@ static int journal_replay (const char *file) /* {{{ */ entry[entry_len - 1] = '\0'; - if (handle_request(NULL, entry, entry_len) == 0) + if (handle_request(NULL, now, entry, entry_len) == 0) ++entry_cnt; else ++fail_cnt; @@ -1714,6 +1929,7 @@ static void *connection_thread_main (void *args) /* {{{ */ char *cmd; ssize_t cmd_len; ssize_t rbytes; + time_t now; struct pollfd pollfd; int status; @@ -1730,23 +1946,18 @@ static void *connection_thread_main (void *args) /* {{{ */ else if (status < 0) /* error */ { status = errno; - if (status == EINTR) - continue; - RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); + if (status != EINTR) + RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); continue; } if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ - { - close_connection(sock); break; - } else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) { RRDD_LOG (LOG_WARNING, "connection_thread_main: " "poll(2) returned something unexpected: %#04hx", pollfd.revents); - close_connection(sock); break; } @@ -1762,9 +1973,14 @@ static void *connection_thread_main (void *args) /* {{{ */ sock->next_read += rbytes; + if (sock->batch_start) + now = sock->batch_start; + else + now = time(NULL); + while ((cmd = next_cmd(sock, &cmd_len)) != NULL) { - status = handle_request (sock, cmd, cmd_len+1); + status = handle_request (sock, now, cmd, cmd_len+1); if (status != 0) goto out_close; } @@ -2140,13 +2356,16 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ static int daemonize (void) /* {{{ */ { int status; - int fd; + int pid_fd; char *base_dir; daemon_uid = geteuid(); - fd = open_pidfile(); - if (fd < 0) return fd; + pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY); + if (pid_fd < 0) + pid_fd = check_pidfile(); + if (pid_fd < 0) + return pid_fd; if (!stay_foreground) { @@ -2199,7 +2418,7 @@ static int daemonize (void) /* {{{ */ return (-1); } - status = write_pidfile (fd); + status = write_pidfile (pid_fd); return status; } /* }}} int daemonize */ @@ -2315,6 +2534,7 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'b': { size_t len; + char base_realpath[PATH_MAX]; if (config_base_dir != NULL) free (config_base_dir); @@ -2325,6 +2545,27 @@ static int read_options (int argc, char **argv) /* {{{ */ return (3); } + /* make sure that the base directory is not resolved via + * symbolic links. this makes some performance-enhancing + * assumptions possible (we don't have to resolve paths + * that start with a "/") + */ + if (realpath(config_base_dir, base_realpath) == NULL) + { + fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir); + return 5; + } + else if (strncmp(config_base_dir, + base_realpath, sizeof(base_realpath)) != 0) + { + fprintf(stderr, + "Base directory (-b) resolved via file system links!\n" + "Please consult rrdcached '-b' documentation!\n" + "Consider specifying the real directory (%s)\n", + base_realpath); + return 5; + } + len = strlen (config_base_dir); while ((len > 0) && (config_base_dir[len - 1] == '/')) {