034beca9471a6a9ed7899e6c6f65d8b3322d386b
[rrdtool.git] / src / rrd_client.c
1 /**
2  * RRDTool - src/rrd_client.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  *   Sebastian tokkee Harl <sh at tokkee.org>
21  **/
22
23 #include "rrd.h"
24 #include "rrd_tool.h"
25 #include "rrd_client.h"
26
27 #include <stdlib.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <errno.h>
31 #include <assert.h>
32 #include <pthread.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <sys/un.h>
36 #include <netdb.h>
37 #include <limits.h>
38
39 #ifndef ENODATA
40 #define ENODATA ENOENT
41 #endif
42
43 struct rrdc_response_s
44 {
45   int status;
46   char *message;
47   char **lines;
48   size_t lines_num;
49 };
50 typedef struct rrdc_response_s rrdc_response_t;
51
52 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
53 static int sd = -1;
54 static FILE *sh = NULL;
55 static char *sd_path = NULL; /* cache the path for sd */
56
57 /* get_path: Return a path name appropriate to be sent to the daemon.
58  *
59  * When talking to a local daemon (thru a UNIX socket), relative path names
60  * are resolved to absolute path names to allow for transparent integration
61  * into existing solutions (as requested by Tobi). Else, absolute path names
62  * are not allowed, since path name translation is done by the server.
63  *
64  * One must hold `lock' when calling this function. */
65 static const char *get_path (const char *path, char *resolved_path) /* {{{ */
66 {
67   const char *ret = path;
68   int is_unix = 0;
69
70   if ((*sd_path == '/')
71       || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0))
72     is_unix = 1;
73
74   if (*path == '/') /* absolute path */
75   {
76     if (! is_unix)
77     {
78       rrd_set_error ("absolute path names not allowed when talking "
79           "to a remote daemon");
80       return (NULL);
81     }
82     /* else: nothing to do */
83   }
84   else /* relative path */
85   {
86     if (is_unix)
87     {
88       realpath (path, resolved_path);
89       ret = resolved_path;
90     }
91     /* else: nothing to do */
92   }
93   return (ret);
94 } /* }}} char *get_path */
95
96 /* One must hold `lock' when calling `close_connection'. */
97 static void close_connection (void) /* {{{ */
98 {
99   if (sh != NULL)
100   {
101     fclose (sh);
102     sh = NULL;
103     sd = -1;
104   }
105   else if (sd >= 0)
106   {
107     close (sd);
108     sd = -1;
109   }
110
111   if (sd_path != NULL)
112     free (sd_path);
113   sd_path = NULL;
114 } /* }}} void close_connection */
115
116 static int buffer_add_string (const char *str, /* {{{ */
117     char **buffer_ret, size_t *buffer_size_ret)
118 {
119   char *buffer;
120   size_t buffer_size;
121   size_t buffer_pos;
122   size_t i;
123   int status;
124
125   buffer = *buffer_ret;
126   buffer_size = *buffer_size_ret;
127   buffer_pos = 0;
128
129   i = 0;
130   status = -1;
131   while (buffer_pos < buffer_size)
132   {
133     if (str[i] == 0)
134     {
135       buffer[buffer_pos] = ' ';
136       buffer_pos++;
137       status = 0;
138       break;
139     }
140     else if ((str[i] == ' ') || (str[i] == '\\'))
141     {
142       if (buffer_pos >= (buffer_size - 1))
143         break;
144       buffer[buffer_pos] = '\\';
145       buffer_pos++;
146       buffer[buffer_pos] = str[i];
147       buffer_pos++;
148     }
149     else
150     {
151       buffer[buffer_pos] = str[i];
152       buffer_pos++;
153     }
154     i++;
155   } /* while (buffer_pos < buffer_size) */
156
157   if (status != 0)
158     return (-1);
159
160   *buffer_ret = buffer + buffer_pos;
161   *buffer_size_ret = buffer_size - buffer_pos;
162
163   return (0);
164 } /* }}} int buffer_add_string */
165
166 static int buffer_add_value (const char *value, /* {{{ */
167     char **buffer_ret, size_t *buffer_size_ret)
168 {
169   char temp[4096];
170
171   if (strncmp (value, "N:", 2) == 0)
172     snprintf (temp, sizeof (temp), "%lu:%s",
173         (unsigned long) time (NULL), value + 2);
174   else
175     strncpy (temp, value, sizeof (temp));
176   temp[sizeof (temp) - 1] = 0;
177
178   return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
179 } /* }}} int buffer_add_value */
180
181 /* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
182  * the Perl function `chomp'. Returns the number of characters that have been
183  * removed. */
184 static int chomp (char *str) /* {{{ */
185 {
186   size_t len;
187   int removed;
188
189   if (str == NULL)
190     return (-1);
191
192   len = strlen (str);
193   removed = 0;
194   while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
195   {
196     str[len - 1] = 0;
197     len--;
198     removed++;
199   }
200
201   return (removed);
202 } /* }}} int chomp */
203
204 static void response_free (rrdc_response_t *res) /* {{{ */
205 {
206   if (res == NULL)
207     return;
208
209   if (res->lines != NULL)
210   {
211     size_t i;
212
213     for (i = 0; i < res->lines_num; i++)
214       if (res->lines[i] != NULL)
215         free (res->lines[i]);
216     free (res->lines);
217   }
218
219   free (res);
220 } /* }}} void response_free */
221
222 static int response_read (rrdc_response_t **ret_response) /* {{{ */
223 {
224   rrdc_response_t *ret;
225
226   char buffer[4096];
227   char *buffer_ptr;
228
229   size_t i;
230
231   if (sh == NULL)
232     return (-1);
233
234   ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
235   if (ret == NULL)
236     return (-2);
237   memset (ret, 0, sizeof (*ret));
238   ret->lines = NULL;
239   ret->lines_num = 0;
240
241   buffer_ptr = fgets (buffer, sizeof (buffer), sh);
242   if (buffer_ptr == NULL)
243     return (-3);
244   chomp (buffer);
245
246   ret->status = strtol (buffer, &ret->message, 0);
247   if (buffer == ret->message)
248   {
249     response_free (ret);
250     return (-4);
251   }
252   /* Skip leading whitespace of the status message */
253   ret->message += strspn (ret->message, " \t");
254
255   if (ret->status <= 0)
256   {
257     if (ret->status < 0)
258       rrd_set_error("rrdcached: %s", ret->message);
259     *ret_response = ret;
260     return (0);
261   }
262
263   ret->lines = (char **) malloc (sizeof (char *) * ret->status);
264   if (ret->lines == NULL)
265   {
266     response_free (ret);
267     return (-5);
268   }
269   memset (ret->lines, 0, sizeof (char *) * ret->status);
270   ret->lines_num = (size_t) ret->status;
271
272   for (i = 0; i < ret->lines_num; i++)
273   {
274     buffer_ptr = fgets (buffer, sizeof (buffer), sh);
275     if (buffer_ptr == NULL)
276     {
277       response_free (ret);
278       return (-6);
279     }
280     chomp (buffer);
281
282     ret->lines[i] = strdup (buffer);
283     if (ret->lines[i] == NULL)
284     {
285       response_free (ret);
286       return (-7);
287     }
288   }
289
290   *ret_response = ret;
291   return (0);
292 } /* }}} rrdc_response_t *response_read */
293
294 static int request (const char *buffer, size_t buffer_size, /* {{{ */
295     rrdc_response_t **ret_response)
296 {
297   int status;
298   rrdc_response_t *res;
299
300   if (sh == NULL)
301     return (ENOTCONN);
302
303   status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
304   if (status != 1)
305   {
306     close_connection ();
307     rrd_set_error("request: socket error (%d) while talking to rrdcached",
308                   status);
309     return (-1);
310   }
311   fflush (sh);
312
313   res = NULL;
314   status = response_read (&res);
315
316   if (status != 0)
317   {
318     if (status < 0)
319       rrd_set_error("request: internal error while talking to rrdcached");
320     return (status);
321   }
322
323   *ret_response = res;
324   return (0);
325 } /* }}} int request */
326
327 /* determine whether we are connected to the specified daemon_addr if
328  * NULL, return whether we are connected at all
329  */
330 int rrdc_is_connected(const char *daemon_addr) /* {{{ */
331 {
332   if (sd < 0)
333     return 0;
334   else if (daemon_addr == NULL)
335   {
336     /* here we have to handle the case i.e.
337      *   UPDATE --daemon ...; UPDATEV (no --daemon) ...
338      * In other words: we have a cached connection,
339      * but it is not specified in the current command.
340      * Daemon is only implied in this case if set in ENV
341      */
342     if (getenv(ENV_RRDCACHED_ADDRESS) != NULL)
343       return 1;
344     else
345       return 0;
346   }
347   else if (strcmp(daemon_addr, sd_path) == 0)
348     return 1;
349   else
350     return 0;
351
352 } /* }}} int rrdc_is_connected */
353
354 static int rrdc_connect_unix (const char *path) /* {{{ */
355 {
356   struct sockaddr_un sa;
357   int status;
358
359   assert (path != NULL);
360   assert (sd == -1);
361
362   sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
363   if (sd < 0)
364   {
365     status = errno;
366     return (status);
367   }
368
369   memset (&sa, 0, sizeof (sa));
370   sa.sun_family = AF_UNIX;
371   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
372
373   status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
374   if (status != 0)
375   {
376     status = errno;
377     close_connection ();
378     return (status);
379   }
380
381   sh = fdopen (sd, "r+");
382   if (sh == NULL)
383   {
384     status = errno;
385     close_connection ();
386     return (status);
387   }
388
389   return (0);
390 } /* }}} int rrdc_connect_unix */
391
392 static int rrdc_connect_network (const char *addr_orig) /* {{{ */
393 {
394   struct addrinfo ai_hints;
395   struct addrinfo *ai_res;
396   struct addrinfo *ai_ptr;
397   char addr_copy[NI_MAXHOST];
398   char *addr;
399   char *port;
400
401   assert (addr_orig != NULL);
402   assert (sd == -1);
403
404   strncpy(addr_copy, addr_orig, sizeof(addr_copy));
405   addr_copy[sizeof(addr_copy) - 1] = '\0';
406   addr = addr_copy;
407
408   int status;
409   memset (&ai_hints, 0, sizeof (ai_hints));
410   ai_hints.ai_flags = 0;
411 #ifdef AI_ADDRCONFIG
412   ai_hints.ai_flags |= AI_ADDRCONFIG;
413 #endif
414   ai_hints.ai_family = AF_UNSPEC;
415   ai_hints.ai_socktype = SOCK_STREAM;
416
417   port = NULL;
418   if (*addr == '[') /* IPv6+port format */
419   {
420     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
421     addr++;
422
423     port = strchr (addr, ']');
424     if (port == NULL)
425     {
426       rrd_set_error("malformed address: %s", addr_orig);
427       return (-1);
428     }
429     *port = 0;
430     port++;
431
432     if (*port == ':')
433       port++;
434     else if (*port == 0)
435       port = NULL;
436     else
437     {
438       rrd_set_error("garbage after address: %s", port);
439       return (-1);
440     }
441   } /* if (*addr == '[') */
442   else
443   {
444     port = rindex(addr, ':');
445     if (port != NULL)
446     {
447       *port = 0;
448       port++;
449     }
450   }
451
452   ai_res = NULL;
453   status = getaddrinfo (addr,
454                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
455                         &ai_hints, &ai_res);
456   if (status != 0)
457   {
458     rrd_set_error ("failed to resolve address `%s' (port %s): %s",
459         addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port,
460         gai_strerror (status));
461     return (-1);
462   }
463
464   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
465   {
466     sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
467     if (sd < 0)
468     {
469       status = errno;
470       sd = -1;
471       continue;
472     }
473
474     status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
475     if (status != 0)
476     {
477       status = errno;
478       close_connection();
479       continue;
480     }
481
482     sh = fdopen (sd, "r+");
483     if (sh == NULL)
484     {
485       status = errno;
486       close_connection ();
487       continue;
488     }
489
490     assert (status == 0);
491     break;
492   } /* for (ai_ptr) */
493
494   return (status);
495 } /* }}} int rrdc_connect_network */
496
497 int rrdc_connect (const char *addr) /* {{{ */
498 {
499   int status = 0;
500
501   if (addr == NULL)
502     addr = getenv (ENV_RRDCACHED_ADDRESS);
503
504   if (addr == NULL)
505     return 0;
506
507   pthread_mutex_lock(&lock);
508
509   if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
510   {
511     /* connection to the same daemon; use cached connection */
512     pthread_mutex_unlock (&lock);
513     return (0);
514   }
515   else
516   {
517     close_connection();
518   }
519
520   rrd_clear_error ();
521   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
522     status = rrdc_connect_unix (addr + strlen ("unix:"));
523   else if (addr[0] == '/')
524     status = rrdc_connect_unix (addr);
525   else
526     status = rrdc_connect_network(addr);
527
528   if (status == 0 && sd >= 0)
529     sd_path = strdup(addr);
530   else
531   {
532     char *err = rrd_test_error () ? rrd_get_error () : "Internal error";
533     /* err points the string that gets written to by rrd_set_error(), thus we
534      * cannot pass it to that function */
535     err = strdup (err);
536     rrd_set_error("Unable to connect to rrdcached: %s",
537                   (status < 0)
538                   ? (err ? err : "Internal error")
539                   : rrd_strerror (status));
540     if (err != NULL)
541       free (err);
542   }
543
544   pthread_mutex_unlock (&lock);
545   return (status);
546 } /* }}} int rrdc_connect */
547
548 int rrdc_disconnect (void) /* {{{ */
549 {
550   pthread_mutex_lock (&lock);
551
552   close_connection();
553
554   pthread_mutex_unlock (&lock);
555
556   return (0);
557 } /* }}} int rrdc_disconnect */
558
559 int rrdc_update (const char *filename, int values_num, /* {{{ */
560                 const char * const *values)
561 {
562   char buffer[4096];
563   char *buffer_ptr;
564   size_t buffer_free;
565   size_t buffer_size;
566   rrdc_response_t *res;
567   int status;
568   int i;
569   char file_path[PATH_MAX];
570
571   memset (buffer, 0, sizeof (buffer));
572   buffer_ptr = &buffer[0];
573   buffer_free = sizeof (buffer);
574
575   status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
576   if (status != 0)
577     return (ENOBUFS);
578
579   pthread_mutex_lock (&lock);
580   filename = get_path (filename, file_path);
581   if (filename == NULL)
582   {
583     pthread_mutex_unlock (&lock);
584     return (-1);
585   }
586
587   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
588   if (status != 0)
589   {
590     pthread_mutex_unlock (&lock);
591     return (ENOBUFS);
592   }
593
594   for (i = 0; i < values_num; i++)
595   {
596     status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
597     if (status != 0)
598     {
599       pthread_mutex_unlock (&lock);
600       return (ENOBUFS);
601     }
602   }
603
604   assert (buffer_free < sizeof (buffer));
605   buffer_size = sizeof (buffer) - buffer_free;
606   assert (buffer[buffer_size - 1] == ' ');
607   buffer[buffer_size - 1] = '\n';
608
609   res = NULL;
610   status = request (buffer, buffer_size, &res);
611   pthread_mutex_unlock (&lock);
612
613   if (status != 0)
614     return (status);
615
616   status = res->status;
617   response_free (res);
618
619   return (status);
620 } /* }}} int rrdc_update */
621
622 int rrdc_flush (const char *filename) /* {{{ */
623 {
624   char buffer[4096];
625   char *buffer_ptr;
626   size_t buffer_free;
627   size_t buffer_size;
628   rrdc_response_t *res;
629   int status;
630   char file_path[PATH_MAX];
631
632   if (filename == NULL)
633     return (-1);
634
635   memset (buffer, 0, sizeof (buffer));
636   buffer_ptr = &buffer[0];
637   buffer_free = sizeof (buffer);
638
639   status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
640   if (status != 0)
641     return (ENOBUFS);
642
643   pthread_mutex_lock (&lock);
644   filename = get_path (filename, file_path);
645   if (filename == NULL)
646   {
647     pthread_mutex_unlock (&lock);
648     return (-1);
649   }
650
651   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
652   if (status != 0)
653   {
654     pthread_mutex_unlock (&lock);
655     return (ENOBUFS);
656   }
657
658   assert (buffer_free < sizeof (buffer));
659   buffer_size = sizeof (buffer) - buffer_free;
660   assert (buffer[buffer_size - 1] == ' ');
661   buffer[buffer_size - 1] = '\n';
662
663   res = NULL;
664   status = request (buffer, buffer_size, &res);
665   pthread_mutex_unlock (&lock);
666
667   if (status != 0)
668     return (status);
669
670   status = res->status;
671   response_free (res);
672
673   return (status);
674 } /* }}} int rrdc_flush */
675
676
677 /* convenience function; if there is a daemon specified, or if we can
678  * detect one from the environment, then flush the file.  Otherwise, no-op
679  */
680 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
681 {
682   int status = 0;
683
684   rrdc_connect(opt_daemon);
685
686   if (rrdc_is_connected(opt_daemon))
687   {
688     rrd_clear_error();
689     status = rrdc_flush (filename);
690
691     if (status != 0 && !rrd_test_error())
692     {
693       if (status > 0)
694       {
695         rrd_set_error("rrdc_flush (%s) failed: %s",
696                       filename, rrd_strerror(status));
697       }
698       else if (status < 0)
699       {
700         rrd_set_error("rrdc_flush (%s) failed with status %i.",
701                       filename, status);
702       }
703     }
704   } /* if (rrdc_is_connected(..)) */
705
706   return status;
707 } /* }}} int rrdc_flush_if_daemon */
708
709
710 int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
711 {
712   rrdc_stats_t *head;
713   rrdc_stats_t *tail;
714
715   rrdc_response_t *res;
716
717   int status;
718   size_t i;
719
720   /* Protocol example: {{{
721    * ->  STATS
722    * <-  5 Statistics follow
723    * <-  QueueLength: 0
724    * <-  UpdatesWritten: 0
725    * <-  DataSetsWritten: 0
726    * <-  TreeNodesNumber: 0
727    * <-  TreeDepth: 0
728    * }}} */
729
730   res = NULL;
731   pthread_mutex_lock (&lock);
732   status = request ("STATS\n", strlen ("STATS\n"), &res);
733   pthread_mutex_unlock (&lock);
734
735   if (status != 0)
736     return (status);
737
738   if (res->status <= 0)
739   {
740     response_free (res);
741     return (EIO);
742   }
743
744   head = NULL;
745   tail = NULL;
746   for (i = 0; i < res->lines_num; i++)
747   {
748     char *key;
749     char *value;
750     char *endptr;
751     rrdc_stats_t *s;
752
753     key = res->lines[i];
754     value = strchr (key, ':');
755     if (value == NULL)
756       continue;
757     *value = 0;
758     value++;
759
760     while ((value[0] == ' ') || (value[0] == '\t'))
761       value++;
762
763     s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
764     if (s == NULL)
765       continue;
766     memset (s, 0, sizeof (*s));
767
768     s->name = strdup (key);
769
770     endptr = NULL;
771     if ((strcmp ("QueueLength", key) == 0)
772         || (strcmp ("TreeDepth", key) == 0)
773         || (strcmp ("TreeNodesNumber", key) == 0))
774     {
775       s->type = RRDC_STATS_TYPE_GAUGE;
776       s->value.gauge = strtod (value, &endptr);
777     }
778     else if ((strcmp ("DataSetsWritten", key) == 0)
779         || (strcmp ("FlushesReceived", key) == 0)
780         || (strcmp ("JournalBytes", key) == 0)
781         || (strcmp ("JournalRotate", key) == 0)
782         || (strcmp ("UpdatesReceived", key) == 0)
783         || (strcmp ("UpdatesWritten", key) == 0))
784     {
785       s->type = RRDC_STATS_TYPE_COUNTER;
786       s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
787     }
788     else
789     {
790       free (s);
791       continue;
792     }
793
794     /* Conversion failed */
795     if (endptr == value)
796     {
797       free (s);
798       continue;
799     }
800
801     if (head == NULL)
802     {
803       head = s;
804       tail = s;
805       s->next = NULL;
806     }
807     else
808     {
809       tail->next = s;
810       tail = s;
811     }
812   } /* for (i = 0; i < res->lines_num; i++) */
813
814   response_free (res);
815
816   if (head == NULL)
817     return (EPROTO);
818
819   *ret_stats = head;
820   return (0);
821 } /* }}} int rrdc_stats_get */
822
823 void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
824 {
825   rrdc_stats_t *this;
826
827   this = ret_stats;
828   while (this != NULL)
829   {
830     rrdc_stats_t *next;
831
832     next = this->next;
833
834     if (this->name != NULL)
835     {
836       free ((char *)this->name);
837       this->name = NULL;
838     }
839     free (this);
840
841     this = next;
842   } /* while (this != NULL) */
843 } /* }}} void rrdc_stats_free */
844
845 /*
846  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
847  */