X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=9c8847dd7e4a17ce157853a02ff2440d5354dce0;hp=03dd181f76189a60945c89c7aaf59de8f6c222ec;hb=db9842391d7f238faf7b5ad1b01059115a4f2264;hpb=1164122dace49986850172ef2cb198dc301fe750 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 03dd181..9c8847d 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -116,6 +116,10 @@ struct listen_socket_s int family; socket_privilege privilege; + /* state for BATCH processing */ + int batch_mode; + int batch_cmd; + /* buffered IO */ char *rbuf; off_t next_cmd; @@ -166,6 +170,7 @@ typedef enum queue_side_e queue_side_t; * Variables */ static int stay_foreground = 0; +static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; @@ -399,6 +404,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ int len; if (sock == NULL) return 0; /* journal replay mode */ + if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); #ifdef HAVE_VSNPRINTF @@ -446,10 +452,14 @@ static int send_response (listen_socket_t *sock, response_code rc, if (sock == NULL) return rc; /* journal replay mode */ - if (rc == RESP_OK) + if (sock->batch_mode) { - lines = count_lines(sock->wbuf); + if (rc == RESP_OK) + return rc; /* no response on success during BATCH */ + lines = sock->batch_cmd; } + else if (rc == RESP_OK) + lines = count_lines(sock->wbuf); else lines = -1; @@ -466,6 +476,10 @@ static int send_response (listen_socket_t *sock, response_code rc, len += rclen; + /* append the result to the wbuf, don't write to the user */ + if (sock->batch_mode) + return add_to_wbuf(sock, buffer, len); + /* first write must be complete */ if (len != write(sock->fd, buffer, len)) { @@ -526,6 +540,34 @@ static void remove_from_queue(cache_item_t *ci) /* {{{ */ ci->flags &= ~CI_FLAGS_IN_QUEUE; } /* }}} static void remove_from_queue */ +/* remove an entry from the tree and free all its resources. + * must hold 'cache lock' while calling this. + * returns 0 on success, otherwise errno */ +static int forget_file(const char *file) +{ + cache_item_t *ci; + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + return ENOENT; + + g_tree_remove (cache_tree, file); + remove_from_queue(ci); + + for (int i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free (ci->values); + free (ci->file); + + /* in case anyone is waiting */ + pthread_cond_broadcast(&ci->flushed); + + free (ci); + + return 0; +} /* }}} static int forget_file */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -660,26 +702,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { - cache_item_t *ci; - - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ + /* should never fail, since we have held the cache_lock + * the entire time */ + assert( forget_file(cfd.keys[k]) == 0 ); + } if (cfd.keys != NULL) { @@ -929,6 +955,20 @@ err: return 0; } /* }}} static int check_file_access */ +/* returns 1 if we have the required privilege level, + * otherwise issue an error to the user on sock */ +static int has_privilege (listen_socket_t *sock, /* {{{ */ + socket_privilege priv) +{ + if (sock == NULL) /* journal replay */ + return 1; + + if (sock->privilege >= priv) + return 1; + + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); +} /* }}} static int has_privilege */ + static int flush_file (const char *filename) /* {{{ */ { cache_item_t *ci; @@ -949,6 +989,9 @@ static int flush_file (const char *filename) /* {{{ */ pthread_cond_wait(&ci->flushed, &cache_lock); } + /* DO NOT DO ANYTHING WITH ci HERE!! The entry + * may have been purged during our cond_wait() */ + pthread_mutex_unlock(&cache_lock); return (0); @@ -965,10 +1008,13 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ { "Command overview\n" , + "HELP []\n" "FLUSH \n" "FLUSHALL\n" - "HELP []\n" + "PENDING \n" + "FORGET \n" "UPDATE [ ...]\n" + "BATCH\n" "STATS\n" }; @@ -991,6 +1037,26 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ "Triggers writing of all pending updates. Returns immediately.\n" }; + char *help_pending[2] = + { + "Help for PENDING\n" + , + "Usage: PENDING \n" + "\n" + "Shows any 'pending' updates for a file, in order.\n" + "The updates shown have not yet been written to the underlying RRD file.\n" + }; + + char *help_forget[2] = + { + "Help for FORGET\n" + , + "Usage: FORGET \n" + "\n" + "Removes the file completely from the cache.\n" + "Any pending updates for the file will be lost.\n" + }; + char *help_update[2] = { "Help for UPDATE\n" @@ -1016,6 +1082,28 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ "a description of the values.\n" }; + char *help_batch[2] = + { + "Help for BATCH\n" + , + "The 'BATCH' command permits the client to initiate a bulk load\n" + " of commands to rrdcached.\n" + "\n" + "Usage:\n" + "\n" + " client: BATCH\n" + " server: 0 Go ahead. End with dot '.' on its own line.\n" + " client: command #1\n" + " client: command #2\n" + " client: ... and so on\n" + " client: .\n" + " server: 2 errors\n" + " server: 7 message for command #7\n" + " server: 9 message for command #9\n" + "\n" + "For more information, consult the rrdcached(1) documentation.\n" + }; + status = buffer_get_field (&buffer, &buffer_size, &command); if (status != 0) help_text = help_help; @@ -1027,8 +1115,14 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ help_text = help_flush; else if (strcasecmp (command, "flushall") == 0) help_text = help_flushall; + else if (strcasecmp (command, "pending") == 0) + help_text = help_pending; + else if (strcasecmp (command, "forget") == 0) + help_text = help_forget; else if (strcasecmp (command, "stats") == 0) help_text = help_stats; + else if (strcasecmp (command, "batch") == 0) + help_text = help_batch; else help_text = help_help; } @@ -1130,6 +1224,11 @@ static int handle_request_flush (listen_socket_t *sock, /* {{{ */ static int handle_request_flushall(listen_socket_t *sock) /* {{{ */ { + int status; + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; RRDD_LOG(LOG_DEBUG, "Received FLUSHALL"); @@ -1140,18 +1239,93 @@ static int handle_request_flushall(listen_socket_t *sock) /* {{{ */ return send_response(sock, RESP_OK, "Started flush.\n"); } /* }}} static int handle_request_flushall */ +static int handle_request_pending(listen_socket_t *sock, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char *file; + cache_item_t *ci; + + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return send_response(sock, RESP_ERR, + "Usage: PENDING \n"); + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + { + pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); + } + + for (int i=0; i < ci->values_num; i++) + add_response_info(sock, "%s\n", ci->values[i]); + + pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_OK, "updates pending\n"); +} /* }}} static int handle_request_pending */ + +static int handle_request_forget(listen_socket_t *sock, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char *file; + + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return send_response(sock, RESP_ERR, + "Usage: FORGET \n"); + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + if (!check_file_access(file, sock)) return 0; + + pthread_mutex_lock(&cache_lock); + status = forget_file(file); + pthread_mutex_unlock(&cache_lock); + + if (status == 0) + { + if (sock != NULL) + journal_write("forget", file); + + return send_response(sock, RESP_OK, "Gone!\n"); + } + else + return send_response(sock, RESP_ERR, "cannot forget: %s\n", + status < 0 ? "Internal error" : rrd_strerror(status)); + + /* NOTREACHED */ + assert(1==0); +} /* }}} static int handle_request_forget */ + static int handle_request_update (listen_socket_t *sock, /* {{{ */ char *buffer, size_t buffer_size) { char *file; int values_num = 0; int status; + char orig_buf[CMD_MAX]; time_t now; cache_item_t *ci; now = time (NULL); + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + /* save it for the journal later */ + strncpy(orig_buf, buffer, sizeof(orig_buf)-1); + status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) return send_response(sock, RESP_ERR, @@ -1219,6 +1393,10 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ } /* }}} */ assert (ci != NULL); + /* don't re-write updates in replay mode */ + if (sock != NULL) + journal_write("update", orig_buf); + while (buffer_size > 0) { char **temp; @@ -1303,18 +1481,29 @@ static int handle_request_wrote (const char *buffer) /* {{{ */ return (0); } /* }}} int handle_request_wrote */ -/* returns 1 if we have the required privilege level */ -static int has_privilege (listen_socket_t *sock, /* {{{ */ - socket_privilege priv) +/* start "BATCH" processing */ +static int batch_start (listen_socket_t *sock) /* {{{ */ { - if (sock == NULL) /* journal replay */ - return 1; + int status; + if (sock->batch_mode) + return send_response(sock, RESP_ERR, "Already in BATCH\n"); - if (sock->privilege >= priv) - return 1; + status = send_response(sock, RESP_OK, + "Go ahead. End with dot '.' on its own line.\n"); + sock->batch_mode = 1; + sock->batch_cmd = 0; - return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); -} /* }}} static int has_privilege */ + return status; +} /* }}} static int batch_start */ + +/* finish "BATCH" processing and return results to the client */ +static int batch_done (listen_socket_t *sock) /* {{{ */ +{ + assert(sock->batch_mode); + sock->batch_mode = 0; + sock->batch_cmd = 0; + return send_response(sock, RESP_OK, "errors\n"); +} /* }}} static int batch_done */ /* if sock==NULL, we are in journal replay mode */ static int handle_request (listen_socket_t *sock, /* {{{ */ @@ -1335,18 +1524,11 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (-1); } - if (strcasecmp (command, "update") == 0) - { - status = has_privilege(sock, PRIV_HIGH); - if (status <= 0) - return status; - - /* don't re-write updates in replay mode */ - if (sock != NULL) - journal_write(command, buffer_ptr); + if (sock != NULL && sock->batch_mode) + sock->batch_cmd++; + if (strcasecmp (command, "update") == 0) return (handle_request_update (sock, buffer_ptr, buffer_size)); - } else if (strcasecmp (command, "wrote") == 0 && sock == NULL) { /* this is only valid in replay mode */ @@ -1355,17 +1537,19 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ else if (strcasecmp (command, "flush") == 0) return (handle_request_flush (sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "flushall") == 0) - { - status = has_privilege(sock, PRIV_HIGH); - if (status <= 0) - return status; - return (handle_request_flushall(sock)); - } + else if (strcasecmp (command, "pending") == 0) + return (handle_request_pending(sock, buffer_ptr, buffer_size)); + else if (strcasecmp (command, "forget") == 0) + return (handle_request_forget(sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "stats") == 0) return (handle_request_stats (sock)); else if (strcasecmp (command, "help") == 0) return (handle_request_help (sock, buffer_ptr, buffer_size)); + else if (strcasecmp (command, "batch") == 0 && sock != NULL) + return batch_start(sock); + else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode) + return batch_done(sock); else return send_response(sock, RESP_ERR, "Unknown command: %s\n", command); @@ -1377,6 +1561,7 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ static void journal_rotate(void) /* {{{ */ { FILE *old_fh = NULL; + int new_fd; if (journal_cur == NULL || journal_old == NULL) return; @@ -1391,11 +1576,20 @@ static void journal_rotate(void) /* {{{ */ if (journal_fh != NULL) { old_fh = journal_fh; + journal_fh = NULL; rename(journal_cur, journal_old); ++stats_journal_rotate; } - journal_fh = fopen(journal_cur, "a"); + new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND, + S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (new_fd >= 0) + { + journal_fh = fdopen(new_fd, "a"); + if (journal_fh == NULL) + close(new_fd); + } + pthread_mutex_unlock(&journal_lock); if (old_fh != NULL) @@ -1473,6 +1667,44 @@ static int journal_replay (const char *file) /* {{{ */ if (file == NULL) return 0; + { + char *reason; + int status = 0; + struct stat statbuf; + + memset(&statbuf, 0, sizeof(statbuf)); + if (stat(file, &statbuf) != 0) + { + if (errno == ENOENT) + return 0; + + reason = "stat error"; + status = errno; + } + else if (!S_ISREG(statbuf.st_mode)) + { + reason = "not a regular file"; + status = EPERM; + } + if (statbuf.st_uid != daemon_uid) + { + reason = "not owned by daemon user"; + status = EACCES; + } + if (statbuf.st_mode & (S_IWGRP|S_IWOTH)) + { + reason = "must not be user/group writable"; + status = EACCES; + } + + if (status != 0) + { + RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)", + file, rrd_strerror(status), reason); + return 0; + } + } + fh = fopen(file, "r"); if (fh == NULL) { @@ -1513,17 +1745,36 @@ static int journal_replay (const char *file) /* {{{ */ fclose(fh); - if (entry_cnt > 0) - { - RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)", - entry_cnt, fail_cnt); - return 1; - } - else - return 0; + RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)", + entry_cnt, fail_cnt); + return entry_cnt > 0 ? 1 : 0; } /* }}} static int journal_replay */ +static void journal_init(void) /* {{{ */ +{ + int had_journal = 0; + + if (journal_cur == NULL) return; + + pthread_mutex_lock(&journal_lock); + + RRDD_LOG(LOG_INFO, "checking for journal files"); + + had_journal += journal_replay(journal_old); + had_journal += journal_replay(journal_cur); + + /* it must have been a crash. start a flush */ + if (had_journal && config_flush_at_shutdown) + flush_old_values(-1); + + pthread_mutex_unlock(&journal_lock); + journal_rotate(); + + RRDD_LOG(LOG_INFO, "journal processing complete"); + +} /* }}} static void journal_init */ + static void close_connection(listen_socket_t *sock) { close(sock->fd) ; sock->fd = -1; @@ -2006,6 +2257,8 @@ static int daemonize (void) /* {{{ */ int fd; char *base_dir; + daemon_uid = geteuid(); + fd = open_pidfile(); if (fd < 0) return fd; @@ -2330,25 +2583,7 @@ int main (int argc, char **argv) return (1); } - if (journal_cur != NULL) - { - int had_journal = 0; - - pthread_mutex_lock(&journal_lock); - - RRDD_LOG(LOG_INFO, "checking for journal files"); - - had_journal += journal_replay(journal_old); - had_journal += journal_replay(journal_cur); - - if (had_journal) - flush_old_values(-1); - - pthread_mutex_unlock(&journal_lock); - journal_rotate(); - - RRDD_LOG(LOG_INFO, "journal processing complete"); - } + journal_init(); /* start the queue thread */ memset (&queue_thread, 0, sizeof (queue_thread));