d2922813e099083dd58d3788fd5b1f8608c9c7ff
[rrdtool.git] / src / rrd_client.c
1 /**
2  * RRDTool - src/rrd_client.c
3  * Copyright (C) 2008-2010  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy
6  * of this software and associated documentation files (the "Software"), to
7  * deal in the Software without restriction, including without limitation the
8  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9  * sell copies of the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  * 
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  * 
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21  * IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at verplant.org>
25  *   Sebastian tokkee Harl <sh at tokkee.org>
26  **/
27
28 #include "rrd.h"
29 #include "rrd_tool.h"
30 #include "rrd_client.h"
31
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <pthread.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/un.h>
42 #include <netdb.h>
43 #include <limits.h>
44
45 #ifndef ENODATA
46 #define ENODATA ENOENT
47 #endif
48
49 struct rrdc_response_s
50 {
51   int status;
52   char *message;
53   char **lines;
54   size_t lines_num;
55 };
56 typedef struct rrdc_response_s rrdc_response_t;
57
58 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
59 static int sd = -1;
60 static FILE *sh = NULL;
61 static char *sd_path = NULL; /* cache the path for sd */
62
63 /* get_path: Return a path name appropriate to be sent to the daemon.
64  *
65  * When talking to a local daemon (thru a UNIX socket), relative path names
66  * are resolved to absolute path names to allow for transparent integration
67  * into existing solutions (as requested by Tobi). Else, absolute path names
68  * are not allowed, since path name translation is done by the server.
69  *
70  * One must hold `lock' when calling this function. */
71 static const char *get_path (const char *path, char *resolved_path) /* {{{ */
72 {
73   const char *ret = path;
74   int is_unix = 0;
75
76   if ((*sd_path == '/')
77       || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0))
78     is_unix = 1;
79
80   if (is_unix)
81   {
82     ret = realpath(path, resolved_path);
83     if (ret == NULL)
84       rrd_set_error("realpath(%s): %s", path, rrd_strerror(errno));
85     return ret;
86   }
87   else
88   {
89     if (*path == '/') /* not absolute path */
90     {
91       rrd_set_error ("absolute path names not allowed when talking "
92           "to a remote daemon");
93       return NULL;
94     }
95   }
96
97   return path;
98 } /* }}} char *get_path */
99
100 static size_t strsplit (char *string, char **fields, size_t size) /* {{{ */
101 {
102   size_t i;
103   char *ptr;
104   char *saveptr;
105
106   i = 0;
107   ptr = string;
108   saveptr = NULL;
109   while ((fields[i] = strtok_r (ptr, " \t\r\n", &saveptr)) != NULL)
110   {
111     ptr = NULL;
112     i++;
113
114     if (i >= size)
115       break;
116   }
117
118   return (i);
119 } /* }}} size_t strsplit */
120
121 static int parse_header (char *line, /* {{{ */
122     char **ret_key, char **ret_value)
123 {
124   char *tmp;
125
126   *ret_key = line;
127
128   tmp = strchr (line, ':');
129   if (tmp == NULL)
130     return (-1);
131
132   do
133   {
134     *tmp = 0;
135     tmp++;
136   }
137   while ((tmp[0] == ' ') || (tmp[0] == '\t'));
138
139   if (*tmp == 0)
140     return (-1);
141
142   *ret_value = tmp;
143   return (0);
144 } /* }}} int parse_header */
145
146 static int parse_ulong_header (char *line, /* {{{ */
147     char **ret_key, unsigned long *ret_value)
148 {
149   char *str_value;
150   char *endptr;
151   int status;
152
153   str_value = NULL;
154   status = parse_header (line, ret_key, &str_value);
155   if (status != 0)
156     return (status);
157
158   endptr = NULL;
159   errno = 0;
160   *ret_value = (unsigned long) strtol (str_value, &endptr, /* base = */ 0);
161   if ((endptr == str_value) || (errno != 0))
162     return (-1);
163
164   return (0);
165 } /* }}} int parse_ulong_header */
166
167 static int parse_char_array_header (char *line, /* {{{ */
168     char **ret_key, char **array, size_t array_len, int alloc)
169 {
170   char *tmp_array[array_len];
171   char *value;
172   size_t num;
173   int status;
174
175   value = NULL;
176   status = parse_header (line, ret_key, &value);
177   if (status != 0)
178     return (-1);
179
180   num = strsplit (value, tmp_array, array_len);
181   if (num != array_len)
182     return (-1);
183
184   if (alloc == 0)
185   {
186     memcpy (array, tmp_array, sizeof (tmp_array));
187   }
188   else
189   {
190     size_t i;
191
192     for (i = 0; i < array_len; i++)
193       array[i] = strdup (tmp_array[i]);
194   }
195
196   return (0);
197 } /* }}} int parse_char_array_header */
198
199 static int parse_value_array_header (char *line, /* {{{ */
200     time_t *ret_time, rrd_value_t *array, size_t array_len)
201 {
202   char *str_key;
203   char *str_array[array_len];
204   char *endptr;
205   int status;
206   size_t i;
207
208   str_key = NULL;
209   status = parse_char_array_header (line, &str_key,
210       str_array, array_len, /* alloc = */ 0);
211   if (status != 0)
212     return (-1);
213
214   errno = 0;
215   endptr = NULL;
216   *ret_time = (time_t) strtol (str_key, &endptr, /* base = */ 10);
217   if ((endptr == str_key) || (errno != 0))
218     return (-1);
219
220   for (i = 0; i < array_len; i++)
221   {
222     endptr = NULL;
223     array[i] = (rrd_value_t) strtod (str_array[i], &endptr);
224     if ((endptr == str_array[i]) || (errno != 0))
225       return (-1);
226   }
227
228   return (0);
229 } /* }}} int parse_value_array_header */
230
231 /* One must hold `lock' when calling `close_connection'. */
232 static void close_connection (void) /* {{{ */
233 {
234   if (sh != NULL)
235   {
236     fclose (sh);
237     sh = NULL;
238     sd = -1;
239   }
240   else if (sd >= 0)
241   {
242     close (sd);
243     sd = -1;
244   }
245
246   if (sd_path != NULL)
247     free (sd_path);
248   sd_path = NULL;
249 } /* }}} void close_connection */
250
251 static int buffer_add_string (const char *str, /* {{{ */
252     char **buffer_ret, size_t *buffer_size_ret)
253 {
254   char *buffer;
255   size_t buffer_size;
256   size_t buffer_pos;
257   size_t i;
258   int status;
259
260   buffer = *buffer_ret;
261   buffer_size = *buffer_size_ret;
262   buffer_pos = 0;
263
264   i = 0;
265   status = -1;
266   while (buffer_pos < buffer_size)
267   {
268     if (str[i] == 0)
269     {
270       buffer[buffer_pos] = ' ';
271       buffer_pos++;
272       status = 0;
273       break;
274     }
275     else if ((str[i] == ' ') || (str[i] == '\\'))
276     {
277       if (buffer_pos >= (buffer_size - 1))
278         break;
279       buffer[buffer_pos] = '\\';
280       buffer_pos++;
281       buffer[buffer_pos] = str[i];
282       buffer_pos++;
283     }
284     else
285     {
286       buffer[buffer_pos] = str[i];
287       buffer_pos++;
288     }
289     i++;
290   } /* while (buffer_pos < buffer_size) */
291
292   if (status != 0)
293     return (-1);
294
295   *buffer_ret = buffer + buffer_pos;
296   *buffer_size_ret = buffer_size - buffer_pos;
297
298   return (0);
299 } /* }}} int buffer_add_string */
300
301 static int buffer_add_value (const char *value, /* {{{ */
302     char **buffer_ret, size_t *buffer_size_ret)
303 {
304   char temp[4096];
305
306   if (strncmp (value, "N:", 2) == 0)
307     snprintf (temp, sizeof (temp), "%lu:%s",
308         (unsigned long) time (NULL), value + 2);
309   else
310     strncpy (temp, value, sizeof (temp));
311   temp[sizeof (temp) - 1] = 0;
312
313   return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
314 } /* }}} int buffer_add_value */
315
316 /* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
317  * the Perl function `chomp'. Returns the number of characters that have been
318  * removed. */
319 static int chomp (char *str) /* {{{ */
320 {
321   size_t len;
322   int removed;
323
324   if (str == NULL)
325     return (-1);
326
327   len = strlen (str);
328   removed = 0;
329   while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
330   {
331     str[len - 1] = 0;
332     len--;
333     removed++;
334   }
335
336   return (removed);
337 } /* }}} int chomp */
338
339 static void response_free (rrdc_response_t *res) /* {{{ */
340 {
341   if (res == NULL)
342     return;
343
344   if (res->lines != NULL)
345   {
346     size_t i;
347
348     for (i = 0; i < res->lines_num; i++)
349       if (res->lines[i] != NULL)
350         free (res->lines[i]);
351     free (res->lines);
352   }
353
354   free (res);
355 } /* }}} void response_free */
356
357 static int response_read (rrdc_response_t **ret_response) /* {{{ */
358 {
359   rrdc_response_t *ret;
360
361   char buffer[4096];
362   char *buffer_ptr;
363
364   size_t i;
365
366   if (sh == NULL)
367     return (-1);
368
369   ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
370   if (ret == NULL)
371     return (-2);
372   memset (ret, 0, sizeof (*ret));
373   ret->lines = NULL;
374   ret->lines_num = 0;
375
376   buffer_ptr = fgets (buffer, sizeof (buffer), sh);
377   if (buffer_ptr == NULL) {
378     close_connection();
379     return (-3);
380   }
381   chomp (buffer);
382
383   ret->status = strtol (buffer, &ret->message, 0);
384   if (buffer == ret->message)
385   {
386     response_free (ret);
387     close_connection();
388     return (-4);
389   }
390   /* Skip leading whitespace of the status message */
391   ret->message += strspn (ret->message, " \t");
392
393   if (ret->status <= 0)
394   {
395     if (ret->status < 0)
396       rrd_set_error("rrdcached: %s", ret->message);
397     *ret_response = ret;
398     return (0);
399   }
400
401   ret->lines = (char **) malloc (sizeof (char *) * ret->status);
402   if (ret->lines == NULL)
403   {
404     response_free (ret);
405     close_connection();
406     return (-5);
407   }
408   memset (ret->lines, 0, sizeof (char *) * ret->status);
409   ret->lines_num = (size_t) ret->status;
410
411   for (i = 0; i < ret->lines_num; i++)
412   {
413     buffer_ptr = fgets (buffer, sizeof (buffer), sh);
414     if (buffer_ptr == NULL)
415     {
416       response_free (ret);
417       close_connection();
418       return (-6);
419     }
420     chomp (buffer);
421
422     ret->lines[i] = strdup (buffer);
423     if (ret->lines[i] == NULL)
424     {
425       response_free (ret);
426       close_connection();
427       return (-7);
428     }
429   }
430
431   *ret_response = ret;
432   return (0);
433 } /* }}} rrdc_response_t *response_read */
434
435 static int request (const char *buffer, size_t buffer_size, /* {{{ */
436     rrdc_response_t **ret_response)
437 {
438   int status;
439   rrdc_response_t *res;
440
441   if (sh == NULL)
442     return (ENOTCONN);
443
444   status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
445   if (status != 1)
446   {
447     close_connection ();
448     rrd_set_error("request: socket error (%d) while talking to rrdcached",
449                   status);
450     return (-1);
451   }
452   fflush (sh);
453
454   res = NULL;
455   status = response_read (&res);
456
457   if (status != 0)
458   {
459     if (status < 0)
460       rrd_set_error("request: internal error while talking to rrdcached");
461     return (status);
462   }
463
464   *ret_response = res;
465   return (0);
466 } /* }}} int request */
467
468 /* determine whether we are connected to the specified daemon_addr if
469  * NULL, return whether we are connected at all
470  */
471 int rrdc_is_connected(const char *daemon_addr) /* {{{ */
472 {
473   if (sd < 0)
474     return 0;
475   else if (daemon_addr == NULL)
476   {
477     /* here we have to handle the case i.e.
478      *   UPDATE --daemon ...; UPDATEV (no --daemon) ...
479      * In other words: we have a cached connection,
480      * but it is not specified in the current command.
481      * Daemon is only implied in this case if set in ENV
482      */
483     if (getenv(ENV_RRDCACHED_ADDRESS) != NULL)
484       return 1;
485     else
486       return 0;
487   }
488   else if (strcmp(daemon_addr, sd_path) == 0)
489     return 1;
490   else
491     return 0;
492
493 } /* }}} int rrdc_is_connected */
494
495 static int rrdc_connect_unix (const char *path) /* {{{ */
496 {
497   struct sockaddr_un sa;
498   int status;
499
500   assert (path != NULL);
501   assert (sd == -1);
502
503   sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
504   if (sd < 0)
505   {
506     status = errno;
507     return (status);
508   }
509
510   memset (&sa, 0, sizeof (sa));
511   sa.sun_family = AF_UNIX;
512   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
513
514   status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
515   if (status != 0)
516   {
517     status = errno;
518     close_connection ();
519     return (status);
520   }
521
522   sh = fdopen (sd, "r+");
523   if (sh == NULL)
524   {
525     status = errno;
526     close_connection ();
527     return (status);
528   }
529
530   return (0);
531 } /* }}} int rrdc_connect_unix */
532
533 static int rrdc_connect_network (const char *addr_orig) /* {{{ */
534 {
535   struct addrinfo ai_hints;
536   struct addrinfo *ai_res;
537   struct addrinfo *ai_ptr;
538   char addr_copy[NI_MAXHOST];
539   char *addr;
540   char *port;
541
542   assert (addr_orig != NULL);
543   assert (sd == -1);
544
545   strncpy(addr_copy, addr_orig, sizeof(addr_copy));
546   addr_copy[sizeof(addr_copy) - 1] = '\0';
547   addr = addr_copy;
548
549   int status;
550   memset (&ai_hints, 0, sizeof (ai_hints));
551   ai_hints.ai_flags = 0;
552 #ifdef AI_ADDRCONFIG
553   ai_hints.ai_flags |= AI_ADDRCONFIG;
554 #endif
555   ai_hints.ai_family = AF_UNSPEC;
556   ai_hints.ai_socktype = SOCK_STREAM;
557
558   port = NULL;
559   if (*addr == '[') /* IPv6+port format */
560   {
561     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
562     addr++;
563
564     port = strchr (addr, ']');
565     if (port == NULL)
566     {
567       rrd_set_error("malformed address: %s", addr_orig);
568       return (-1);
569     }
570     *port = 0;
571     port++;
572
573     if (*port == ':')
574       port++;
575     else if (*port == 0)
576       port = NULL;
577     else
578     {
579       rrd_set_error("garbage after address: %s", port);
580       return (-1);
581     }
582   } /* if (*addr == '[') */
583   else
584   {
585     port = rindex(addr, ':');
586     if (port != NULL)
587     {
588       *port = 0;
589       port++;
590     }
591   }
592
593   ai_res = NULL;
594   status = getaddrinfo (addr,
595                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
596                         &ai_hints, &ai_res);
597   if (status != 0)
598   {
599     rrd_set_error ("failed to resolve address `%s' (port %s): %s",
600         addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port,
601         gai_strerror (status));
602     return (-1);
603   }
604
605   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
606   {
607     sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
608     if (sd < 0)
609     {
610       status = errno;
611       sd = -1;
612       continue;
613     }
614
615     status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
616     if (status != 0)
617     {
618       status = errno;
619       close_connection();
620       continue;
621     }
622
623     sh = fdopen (sd, "r+");
624     if (sh == NULL)
625     {
626       status = errno;
627       close_connection ();
628       continue;
629     }
630
631     assert (status == 0);
632     break;
633   } /* for (ai_ptr) */
634
635   return (status);
636 } /* }}} int rrdc_connect_network */
637
638 int rrdc_connect (const char *addr) /* {{{ */
639 {
640   int status = 0;
641
642   if (addr == NULL)
643     addr = getenv (ENV_RRDCACHED_ADDRESS);
644
645   if (addr == NULL)
646     return 0;
647
648   pthread_mutex_lock(&lock);
649
650   if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
651   {
652     /* connection to the same daemon; use cached connection */
653     pthread_mutex_unlock (&lock);
654     return (0);
655   }
656   else
657   {
658     close_connection();
659   }
660
661   rrd_clear_error ();
662   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
663     status = rrdc_connect_unix (addr + strlen ("unix:"));
664   else if (addr[0] == '/')
665     status = rrdc_connect_unix (addr);
666   else
667     status = rrdc_connect_network(addr);
668
669   if (status == 0 && sd >= 0)
670     sd_path = strdup(addr);
671   else
672   {
673     char *err = rrd_test_error () ? rrd_get_error () : "Internal error";
674     /* err points the string that gets written to by rrd_set_error(), thus we
675      * cannot pass it to that function */
676     err = strdup (err);
677     rrd_set_error("Unable to connect to rrdcached: %s",
678                   (status < 0)
679                   ? (err ? err : "Internal error")
680                   : rrd_strerror (status));
681     if (err != NULL)
682       free (err);
683   }
684
685   pthread_mutex_unlock (&lock);
686   return (status);
687 } /* }}} int rrdc_connect */
688
689 int rrdc_disconnect (void) /* {{{ */
690 {
691   pthread_mutex_lock (&lock);
692
693   close_connection();
694
695   pthread_mutex_unlock (&lock);
696
697   return (0);
698 } /* }}} int rrdc_disconnect */
699
700 int rrdc_update (const char *filename, int values_num, /* {{{ */
701                 const char * const *values)
702 {
703   char buffer[4096];
704   char *buffer_ptr;
705   size_t buffer_free;
706   size_t buffer_size;
707   rrdc_response_t *res;
708   int status;
709   int i;
710   char file_path[PATH_MAX];
711
712   memset (buffer, 0, sizeof (buffer));
713   buffer_ptr = &buffer[0];
714   buffer_free = sizeof (buffer);
715
716   status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
717   if (status != 0)
718     return (ENOBUFS);
719
720   pthread_mutex_lock (&lock);
721   filename = get_path (filename, file_path);
722   if (filename == NULL)
723   {
724     pthread_mutex_unlock (&lock);
725     return (-1);
726   }
727
728   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
729   if (status != 0)
730   {
731     pthread_mutex_unlock (&lock);
732     return (ENOBUFS);
733   }
734
735   for (i = 0; i < values_num; i++)
736   {
737     status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
738     if (status != 0)
739     {
740       pthread_mutex_unlock (&lock);
741       return (ENOBUFS);
742     }
743   }
744
745   assert (buffer_free < sizeof (buffer));
746   buffer_size = sizeof (buffer) - buffer_free;
747   assert (buffer[buffer_size - 1] == ' ');
748   buffer[buffer_size - 1] = '\n';
749
750   res = NULL;
751   status = request (buffer, buffer_size, &res);
752   pthread_mutex_unlock (&lock);
753
754   if (status != 0)
755     return (status);
756
757   status = res->status;
758   response_free (res);
759
760   return (status);
761 } /* }}} int rrdc_update */
762
763 int rrdc_flush (const char *filename) /* {{{ */
764 {
765   char buffer[4096];
766   char *buffer_ptr;
767   size_t buffer_free;
768   size_t buffer_size;
769   rrdc_response_t *res;
770   int status;
771   char file_path[PATH_MAX];
772
773   if (filename == NULL)
774     return (-1);
775
776   memset (buffer, 0, sizeof (buffer));
777   buffer_ptr = &buffer[0];
778   buffer_free = sizeof (buffer);
779
780   status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
781   if (status != 0)
782     return (ENOBUFS);
783
784   pthread_mutex_lock (&lock);
785   filename = get_path (filename, file_path);
786   if (filename == NULL)
787   {
788     pthread_mutex_unlock (&lock);
789     return (-1);
790   }
791
792   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
793   if (status != 0)
794   {
795     pthread_mutex_unlock (&lock);
796     return (ENOBUFS);
797   }
798
799   assert (buffer_free < sizeof (buffer));
800   buffer_size = sizeof (buffer) - buffer_free;
801   assert (buffer[buffer_size - 1] == ' ');
802   buffer[buffer_size - 1] = '\n';
803
804   res = NULL;
805   status = request (buffer, buffer_size, &res);
806   pthread_mutex_unlock (&lock);
807
808   if (status != 0)
809     return (status);
810
811   status = res->status;
812   response_free (res);
813
814   return (status);
815 } /* }}} int rrdc_flush */
816
817 int rrdc_fetch (const char *filename, /* {{{ */
818     const char *cf,
819     time_t *ret_start, time_t *ret_end,
820     unsigned long *ret_step,
821     unsigned long *ret_ds_num,
822     char ***ret_ds_names,
823     rrd_value_t **ret_data)
824 {
825   char buffer[4096];
826   char *buffer_ptr;
827   size_t buffer_free;
828   size_t buffer_size;
829   rrdc_response_t *res;
830   char path_buffer[PATH_MAX];
831   const char *path_ptr;
832
833   char *str_tmp;
834   unsigned long flush_version;
835
836   time_t start;
837   time_t end;
838   unsigned long step;
839   unsigned long ds_num;
840   char **ds_names;
841
842   rrd_value_t *data;
843   size_t data_size;
844   size_t data_fill;
845
846   int status;
847   size_t current_line;
848   time_t t;
849
850   if ((filename == NULL) || (cf == NULL))
851     return (-1);
852
853   /* Send request {{{ */
854   memset (buffer, 0, sizeof (buffer));
855   buffer_ptr = &buffer[0];
856   buffer_free = sizeof (buffer);
857
858   status = buffer_add_string ("FETCH", &buffer_ptr, &buffer_free);
859   if (status != 0)
860     return (ENOBUFS);
861
862   /* change to path for rrdcached */
863   path_ptr = get_path (filename, path_buffer);
864   if (path_ptr == NULL)
865     return (EINVAL);
866
867   status = buffer_add_string (path_ptr, &buffer_ptr, &buffer_free);
868   if (status != 0)
869     return (ENOBUFS);
870
871   status = buffer_add_string (cf, &buffer_ptr, &buffer_free);
872   if (status != 0)
873     return (ENOBUFS);
874
875   if ((ret_start != NULL) && (*ret_start > 0))
876   {
877     char tmp[64];
878     snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_start);
879     tmp[sizeof (tmp) - 1] = 0;
880     status = buffer_add_string (tmp, &buffer_ptr, &buffer_free);
881     if (status != 0)
882       return (ENOBUFS);
883
884     if ((ret_end != NULL) && (*ret_end > 0))
885     {
886       snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_end);
887       tmp[sizeof (tmp) - 1] = 0;
888       status = buffer_add_string (tmp, &buffer_ptr, &buffer_free);
889       if (status != 0)
890         return (ENOBUFS);
891     }
892   }
893
894   assert (buffer_free < sizeof (buffer));
895   buffer_size = sizeof (buffer) - buffer_free;
896   assert (buffer[buffer_size - 1] == ' ');
897   buffer[buffer_size - 1] = '\n';
898
899   res = NULL;
900   status = request (buffer, buffer_size, &res);
901   if (status != 0)
902     return (status);
903
904   status = res->status;
905   if (status < 0)
906   {
907     rrd_set_error ("rrdcached: %s", res->message);
908     response_free (res);
909     return (status);
910   }
911   /* }}} Send request */
912
913   ds_names = NULL;
914   ds_num = 0;
915   data = NULL;
916   current_line = 0;
917
918   /* Macros to make error handling a little easier (i. e. less to type and
919    * read. `BAIL_OUT' sets the error message, frees all dynamically allocated
920    * variables and returns the provided status code. */
921 #define BAIL_OUT(status, ...) do { \
922     rrd_set_error ("rrdc_fetch: " __VA_ARGS__); \
923     free (data); \
924     if (ds_names != 0) { size_t k; for (k = 0; k < ds_num; k++) free (ds_names[k]); } \
925     free (ds_names); \
926     response_free (res); \
927     return (status); \
928   } while (0)
929
930 #define READ_NUMERIC_FIELD(name,type,var) do { \
931     char *key; \
932     unsigned long value; \
933     assert (current_line < res->lines_num); \
934     status = parse_ulong_header (res->lines[current_line], &key, &value); \
935     if (status != 0) \
936       BAIL_OUT (-1, "Unable to parse header `%s'", name); \
937     if (strcasecmp (key, name) != 0) \
938       BAIL_OUT (-1, "Unexpected header line: Expected `%s', got `%s'", name, key); \
939     var = (type) value; \
940     current_line++; \
941   } while (0)
942
943   if (res->lines_num < 1)
944     BAIL_OUT (-1, "Premature end of response packet");
945
946   /* We're making some very strong assumptions about the fields below. We
947    * therefore check the version of the `flush' command first, so that later
948    * versions can change the order of fields and it's easier to implement
949    * backwards compatibility. */
950   READ_NUMERIC_FIELD ("FlushVersion", unsigned long, flush_version);
951   if (flush_version != 1)
952     BAIL_OUT (-1, "Don't know how to handle flush format version %lu.",
953         flush_version);
954
955   if (res->lines_num < 5)
956     BAIL_OUT (-1, "Premature end of response packet");
957
958   READ_NUMERIC_FIELD ("Start", time_t, start);
959   READ_NUMERIC_FIELD ("End", time_t, end);
960   if (start >= end)
961     BAIL_OUT (-1, "Malformed start and end times: start = %lu; end = %lu;",
962         (unsigned long) start,
963         (unsigned long) end);
964
965   READ_NUMERIC_FIELD ("Step", unsigned long, step);
966   if (step < 1)
967     BAIL_OUT (-1, "Invalid number for Step: %lu", step);
968
969   READ_NUMERIC_FIELD ("DSCount", unsigned long, ds_num);
970   if (ds_num < 1)
971     BAIL_OUT (-1, "Invalid number for DSCount: %lu", ds_num);
972   
973   /* It's time to allocate some memory */
974   ds_names = calloc ((size_t) ds_num, sizeof (*ds_names));
975   if (ds_names == NULL)
976     BAIL_OUT (-1, "Out of memory");
977
978   status = parse_char_array_header (res->lines[current_line],
979       &str_tmp, ds_names, (size_t) ds_num, /* alloc = */ 1);
980   if (status != 0)
981     BAIL_OUT (-1, "Unable to parse header `DSName'");
982   if (strcasecmp ("DSName", str_tmp) != 0)
983     BAIL_OUT (-1, "Unexpected header line: Expected `DSName', got `%s'", str_tmp);
984   current_line++;
985
986   data_size = ds_num * (end - start) / step;
987   if (data_size < 1)
988     BAIL_OUT (-1, "No data returned or headers invalid.");
989
990   if (res->lines_num != (6 + (data_size / ds_num)))
991     BAIL_OUT (-1, "Got %zu lines, expected %zu",
992         res->lines_num, (6 + (data_size / ds_num)));
993
994   data = calloc (data_size, sizeof (*data));
995   if (data == NULL)
996     BAIL_OUT (-1, "Out of memory");
997   
998
999   data_fill = 0;
1000   for (t = start + step; t <= end; t += step, current_line++)
1001   {
1002     time_t tmp;
1003
1004     assert (current_line < res->lines_num);
1005
1006     status = parse_value_array_header (res->lines[current_line],
1007         &tmp, data + data_fill, (size_t) ds_num);
1008     if (status != 0)
1009       BAIL_OUT (-1, "Cannot parse value line");
1010
1011     data_fill += (size_t) ds_num;
1012   }
1013
1014   *ret_start = start;
1015   *ret_end = end;
1016   *ret_step = step;
1017   *ret_ds_num = ds_num;
1018   *ret_ds_names = ds_names;
1019   *ret_data = data;
1020
1021   response_free (res);
1022   return (0);
1023 #undef READ_NUMERIC_FIELD
1024 #undef BAIL_OUT
1025 } /* }}} int rrdc_flush */
1026
1027 /* convenience function; if there is a daemon specified, or if we can
1028  * detect one from the environment, then flush the file.  Otherwise, no-op
1029  */
1030 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
1031 {
1032   int status = 0;
1033
1034   rrdc_connect(opt_daemon);
1035
1036   if (rrdc_is_connected(opt_daemon))
1037   {
1038     rrd_clear_error();
1039     status = rrdc_flush (filename);
1040
1041     if (status != 0 && !rrd_test_error())
1042     {
1043       if (status > 0)
1044       {
1045         rrd_set_error("rrdc_flush (%s) failed: %s",
1046                       filename, rrd_strerror(status));
1047       }
1048       else if (status < 0)
1049       {
1050         rrd_set_error("rrdc_flush (%s) failed with status %i.",
1051                       filename, status);
1052       }
1053     }
1054   } /* if (rrdc_is_connected(..)) */
1055
1056   return status;
1057 } /* }}} int rrdc_flush_if_daemon */
1058
1059
1060 int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
1061 {
1062   rrdc_stats_t *head;
1063   rrdc_stats_t *tail;
1064
1065   rrdc_response_t *res;
1066
1067   int status;
1068   size_t i;
1069
1070   /* Protocol example: {{{
1071    * ->  STATS
1072    * <-  5 Statistics follow
1073    * <-  QueueLength: 0
1074    * <-  UpdatesWritten: 0
1075    * <-  DataSetsWritten: 0
1076    * <-  TreeNodesNumber: 0
1077    * <-  TreeDepth: 0
1078    * }}} */
1079
1080   res = NULL;
1081   pthread_mutex_lock (&lock);
1082   status = request ("STATS\n", strlen ("STATS\n"), &res);
1083   pthread_mutex_unlock (&lock);
1084
1085   if (status != 0)
1086     return (status);
1087
1088   if (res->status <= 0)
1089   {
1090     response_free (res);
1091     return (EIO);
1092   }
1093
1094   head = NULL;
1095   tail = NULL;
1096   for (i = 0; i < res->lines_num; i++)
1097   {
1098     char *key;
1099     char *value;
1100     char *endptr;
1101     rrdc_stats_t *s;
1102
1103     key = res->lines[i];
1104     value = strchr (key, ':');
1105     if (value == NULL)
1106       continue;
1107     *value = 0;
1108     value++;
1109
1110     while ((value[0] == ' ') || (value[0] == '\t'))
1111       value++;
1112
1113     s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
1114     if (s == NULL)
1115       continue;
1116     memset (s, 0, sizeof (*s));
1117
1118     s->name = strdup (key);
1119
1120     endptr = NULL;
1121     if ((strcmp ("QueueLength", key) == 0)
1122         || (strcmp ("TreeDepth", key) == 0)
1123         || (strcmp ("TreeNodesNumber", key) == 0))
1124     {
1125       s->type = RRDC_STATS_TYPE_GAUGE;
1126       s->value.gauge = strtod (value, &endptr);
1127     }
1128     else if ((strcmp ("DataSetsWritten", key) == 0)
1129         || (strcmp ("FlushesReceived", key) == 0)
1130         || (strcmp ("JournalBytes", key) == 0)
1131         || (strcmp ("JournalRotate", key) == 0)
1132         || (strcmp ("UpdatesReceived", key) == 0)
1133         || (strcmp ("UpdatesWritten", key) == 0))
1134     {
1135       s->type = RRDC_STATS_TYPE_COUNTER;
1136       s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
1137     }
1138     else
1139     {
1140       free (s);
1141       continue;
1142     }
1143
1144     /* Conversion failed */
1145     if (endptr == value)
1146     {
1147       free (s);
1148       continue;
1149     }
1150
1151     if (head == NULL)
1152     {
1153       head = s;
1154       tail = s;
1155       s->next = NULL;
1156     }
1157     else
1158     {
1159       tail->next = s;
1160       tail = s;
1161     }
1162   } /* for (i = 0; i < res->lines_num; i++) */
1163
1164   response_free (res);
1165
1166   if (head == NULL)
1167     return (EPROTO);
1168
1169   *ret_stats = head;
1170   return (0);
1171 } /* }}} int rrdc_stats_get */
1172
1173 void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
1174 {
1175   rrdc_stats_t *this;
1176
1177   this = ret_stats;
1178   while (this != NULL)
1179   {
1180     rrdc_stats_t *next;
1181
1182     next = this->next;
1183
1184     if (this->name != NULL)
1185     {
1186       free ((char *)this->name);
1187       this->name = NULL;
1188     }
1189     free (this);
1190
1191     this = next;
1192   } /* while (this != NULL) */
1193 } /* }}} void rrdc_stats_free */
1194
1195 /*
1196  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1197  */