X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=248095899b4c4ea2f36b0af562eb1cee07e7b5d4;hb=bc1d8aeea1af6821c1530188d8bd1cb465ad8285;hp=372adee9deea8e24f41160b794fbb34370fcd6b3;hpb=4a15d03375160bbd7df66a4d754c37a206d60522;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 372adee..2480958 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,6 +1,6 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008,2009 Florian octo Forster + * Copyright (C) 2008-2010 Florian octo Forster * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it @@ -111,7 +111,12 @@ #include /* }}} */ -#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) +#define RRDD_LOG(severity, ...) \ + do { \ + if (stay_foreground) \ + fprintf(stderr, __VA_ARGS__); \ + syslog ((severity), __VA_ARGS__); \ + } while (0) #ifndef __GNUC__ # define __attribute__(x) /**/ @@ -142,7 +147,8 @@ struct listen_socket_s uint32_t permissions; - gid_t socket_group; + gid_t socket_group; + mode_t socket_permissions; }; typedef struct listen_socket_s listen_socket_t; @@ -177,7 +183,8 @@ struct cache_item_s { char *file; char **values; - size_t values_num; + size_t values_num; /* number of valid pointers */ + size_t values_alloc; /* number of allocated pointers */ time_t last_flush_time; time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) @@ -254,6 +261,7 @@ static char *config_pid_file = NULL; static char *config_base_dir = NULL; static size_t _config_base_dir_len = 0; static int config_write_base_only = 0; +static size_t config_alloc_chunk = 1; static listen_socket_t **config_listen_address_list = NULL; static size_t config_listen_address_list_len = 0; @@ -641,6 +649,7 @@ static void wipe_ci_values(cache_item_t *ci, time_t when) { ci->values = NULL; ci->values_num = 0; + ci->values_alloc = 0; ci->last_flush_time = when; if (config_write_jitter > 0) @@ -820,9 +829,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { + gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); /* should never fail, since we have held the cache_lock * the entire time */ - assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); + assert(status == TRUE); } if (cfd.keys != NULL) @@ -1437,7 +1447,8 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ else ci->last_update_stamp = stamp; - if (!rrd_add_strdup(&ci->values, &ci->values_num, value)) + if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value, + &ci->values_alloc, config_alloc_chunk)) { RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed."); continue; @@ -1466,6 +1477,176 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file; + char *cf; + + char *start_str; + char *end_str; + rrd_time_value_t start_tv; + rrd_time_value_t end_tv; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + /* Parse start time */ + if (start_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (start_str, &start_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': %s\n", start_str, errmsg)); + } + else + rrd_parsetime ("-86400", &start_tv); + + /* Parse end time */ + if (end_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (end_str, &end_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': %s\n", end_str, errmsg)); + } + else + rrd_parsetime ("now", &end_tv); + + start_tm = 0; + end_tm = 0; + status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_proc_start_end failed: %s\n", rrd_get_error ())); + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + } + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ @@ -1638,6 +1819,14 @@ static command_t list_of_commands[] = { /* {{{ */ NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { "QUIT", handle_request_quit, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, @@ -2339,6 +2528,13 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ } } + if (sock->socket_permissions != (mode_t)-1) + { + if (chmod(path, sock->socket_permissions) != 0) + fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n", + (unsigned int)sock->socket_permissions, strerror(errno)); + } + status = listen (fd, /* backlog = */ 10); if (status != 0) { @@ -2759,9 +2955,10 @@ static int read_options (int argc, char **argv) /* {{{ */ char **permissions = NULL; size_t permissions_len = 0; - gid_t socket_group = (gid_t)-1; + gid_t socket_group = (gid_t)-1; + mode_t socket_permissions = (mode_t)-1; - while ((option = getopt(argc, argv, "gl:s:P:f:w:z:t:Bb:p:Fj:h?")) != -1) + while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1) { switch (option) { @@ -2818,6 +3015,7 @@ static int read_options (int argc, char **argv) /* {{{ */ /* }}} Done adding permissions. */ new->socket_group = socket_group; + new->socket_permissions = socket_permissions; if (!rrd_add_ptr((void ***)&config_listen_address_list, &config_listen_address_list_len, new)) @@ -2858,6 +3056,24 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + /* set socket file permissions */ + case 'm': + { + long tmp; + char *endptr = NULL; + + tmp = strtol (optarg, &endptr, 8); + if ((endptr == optarg) || (! endptr) || (*endptr != '\0') + || (tmp > 07777) || (tmp < 0)) { + fprintf (stderr, "read_options: Invalid file mode \"%s\".\n", + optarg); + return (5); + } + + socket_permissions = (mode_t)tmp; + } + break; + case 'P': { char *optcopy; @@ -3050,6 +3266,19 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'a': + { + int temp = atoi(optarg); + if (temp > 0) + config_alloc_chunk = temp; + else + { + fprintf(stderr, "Invalid allocation size: %s\n", optarg); + return 10; + } + } + break; + case 'h': case '?': printf ("RRDCacheD %s\n" @@ -3071,13 +3300,21 @@ static int read_options (int argc, char **argv) /* {{{ */ " -g Do not fork and run in the foreground.\n" " -j Directory in which to create the journal files.\n" " -F Always flush all updates at shutdown\n" - " -s Make socket g+rw to named group\n" + " -s Group owner of all following UNIX sockets\n" + " (the socket will also have read/write permissions " + "for that group)\n" + " -m File permissions (octal) of all following UNIX " + "sockets\n" + " -a Memory allocation chunk size. Default is 1." "\n" "For more information and a detailed description of all options " "please refer\n" "to the rrdcached(1) manual page.\n", VERSION); - status = -1; + if (option == 'h') + status = -1; + else + status = 1; break; } /* switch (option) */ } /* while (getopt) */