RRDcached patch. This implements an infrastructure, where rrd updates can be
[rrdtool.git] / src / rrd_client.c
1 /**
2  * RRDTool - src/rrd_client.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "rrd.h"
23 #include "rrd_client.h"
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <assert.h>
29 #include <pthread.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/un.h>
33 #include <netdb.h>
34
35 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
36 static int sd = -1;
37
38 static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */
39 {
40   char    *buffer;
41   size_t   buffer_used;
42   size_t   buffer_free;
43   ssize_t  status;
44
45   buffer       = (char *) buffer_void;
46   buffer_used  = 0;
47   buffer_free  = buffer_size;
48
49   while (buffer_free > 0)
50   {
51     status = read (sd, buffer + buffer_used, buffer_free);
52     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
53       continue;
54
55     if (status < 0)
56       return (-1);
57
58     if (status == 0)
59     {
60       close (sd);
61       sd = -1;
62       errno = EPROTO;
63       return (-1);
64     }
65
66     assert ((0 > status) || (buffer_free >= (size_t) status));
67
68     buffer_free = buffer_free - status;
69     buffer_used = buffer_used + status;
70
71     if (buffer[buffer_used - 1] == '\n')
72       break;
73   }
74
75   if (buffer[buffer_used - 1] != '\n')
76   {
77     errno = ENOBUFS;
78     return (-1);
79   }
80
81   buffer[buffer_used - 1] = 0;
82   return (buffer_used);
83 } /* }}} ssize_t sread */
84
85 static ssize_t swrite (const void *buf, size_t count) /* {{{ */
86 {
87   const char *ptr;
88   size_t      nleft;
89   ssize_t     status;
90
91   ptr   = (const char *) buf;
92   nleft = count;
93
94   while (nleft > 0)
95   {
96     status = write (sd, (const void *) ptr, nleft);
97
98     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
99       continue;
100
101     if (status < 0)
102     {
103       close (sd);
104       sd = -1;
105       return (status);
106     }
107
108     nleft = nleft - status;
109     ptr   = ptr   + status;
110   }
111
112   return (0);
113 } /* }}} ssize_t swrite */
114
115 static int buffer_add_string (const char *str, /* {{{ */
116     char **buffer_ret, size_t *buffer_size_ret)
117 {
118   char *buffer;
119   size_t buffer_size;
120   size_t buffer_pos;
121   size_t i;
122   int status;
123
124   buffer = *buffer_ret;
125   buffer_size = *buffer_size_ret;
126   buffer_pos = 0;
127
128   i = 0;
129   status = -1;
130   while (buffer_pos < buffer_size)
131   {
132     if (str[i] == 0)
133     {
134       buffer[buffer_pos] = ' ';
135       buffer_pos++;
136       status = 0;
137       break;
138     }
139     else if ((str[i] == ' ') || (str[i] == '\\'))
140     {
141       if (buffer_pos >= (buffer_size - 1))
142         break;
143       buffer[buffer_pos] = '\\';
144       buffer_pos++;
145       buffer[buffer_pos] = str[i];
146       buffer_pos++;
147     }
148     else
149     {
150       buffer[buffer_pos] = str[i];
151       buffer_pos++;
152     }
153     i++;
154   } /* while (buffer_pos < buffer_size) */
155
156   if (status != 0)
157     return (-1);
158
159   *buffer_ret = buffer + buffer_pos;
160   *buffer_size_ret = buffer_size - buffer_pos;
161
162   return (0);
163 } /* }}} int buffer_add_string */
164
165 static int buffer_add_value (const char *value, /* {{{ */
166     char **buffer_ret, size_t *buffer_size_ret)
167 {
168   char temp[4096];
169
170   if (strncmp (value, "N:", 2) == 0)
171     snprintf (temp, sizeof (temp), "%lu:%s",
172         (unsigned long) time (NULL), value + 2);
173   else
174     strncpy (temp, value, sizeof (temp));
175   temp[sizeof (temp) - 1] = 0;
176
177   return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
178 } /* }}} int buffer_add_value */
179
180 static int rrdc_connect_unix (const char *path) /* {{{ */
181 {
182   struct sockaddr_un sa;
183   int status;
184
185   assert (path != NULL);
186
187   pthread_mutex_lock (&lock);
188
189   if (sd >= 0)
190   {
191     pthread_mutex_unlock (&lock);
192     return (0);
193   }
194
195   sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
196   if (sd < 0)
197   {
198     status = errno;
199     pthread_mutex_unlock (&lock);
200     return (status);
201   }
202
203   memset (&sa, 0, sizeof (sa));
204   sa.sun_family = AF_UNIX;
205   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
206
207   status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
208   if (status != 0)
209   {
210     status = errno;
211     pthread_mutex_unlock (&lock);
212     return (status);
213   }
214
215   pthread_mutex_unlock (&lock);
216
217   return (0);
218 } /* }}} int rrdc_connect_unix */
219
220 int rrdc_connect (const char *addr) /* {{{ */
221 {
222   struct addrinfo ai_hints;
223   struct addrinfo *ai_res;
224   struct addrinfo *ai_ptr;
225   int status;
226
227   if (addr == NULL)
228     addr = RRDCACHED_DEFAULT_ADDRESS;
229
230   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
231     return (rrdc_connect_unix (addr + strlen ("unix:")));
232   else if (addr[0] == '/')
233     return (rrdc_connect_unix (addr));
234
235   pthread_mutex_lock (&lock);
236
237   if (sd >= 0)
238   {
239     pthread_mutex_unlock (&lock);
240     return (0);
241   }
242
243   memset (&ai_hints, 0, sizeof (ai_hints));
244   ai_hints.ai_flags = 0;
245 #ifdef AI_ADDRCONFIG
246   ai_hints.ai_flags |= AI_ADDRCONFIG;
247 #endif
248   ai_hints.ai_family = AF_UNSPEC;
249   ai_hints.ai_socktype = SOCK_STREAM;
250
251   ai_res = NULL;
252   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
253   if (status != 0)
254   {
255     pthread_mutex_unlock (&lock);
256     return (status);
257   }
258
259   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
260   {
261     sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
262     if (sd < 0)
263     {
264       status = errno;
265       sd = -1;
266       continue;
267     }
268
269     status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
270     if (status != 0)
271     {
272       status = errno;
273       close (sd);
274       sd = -1;
275       continue;
276     }
277
278     assert (status == 0);
279     break;
280   } /* for (ai_ptr) */
281   pthread_mutex_unlock (&lock);
282
283   return (status);
284 } /* }}} int rrdc_connect */
285
286 int rrdc_disconnect (void) /* {{{ */
287 {
288   pthread_mutex_lock (&lock);
289
290   if (sd < 0)
291   {
292     pthread_mutex_unlock (&lock);
293     return (0);
294   }
295
296   close (sd);
297   sd = -1;
298
299   pthread_mutex_unlock (&lock);
300
301   return (0);
302 } /* }}} int rrdc_disconnect */
303
304 int rrdc_update (const char *filename, int values_num, /* {{{ */
305                 const char * const *values)
306 {
307   char buffer[4096];
308   char *buffer_ptr;
309   size_t buffer_free;
310   size_t buffer_size;
311   int status;
312   int i;
313
314   memset (buffer, 0, sizeof (buffer));
315   buffer_ptr = &buffer[0];
316   buffer_free = sizeof (buffer);
317
318   status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
319   if (status != 0)
320     return (ENOBUFS);
321
322   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
323   if (status != 0)
324     return (ENOBUFS);
325
326   for (i = 0; i < values_num; i++)
327   {
328     status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
329     if (status != 0)
330       return (ENOBUFS);
331   }
332
333   assert (buffer_free < sizeof (buffer));
334   buffer_size = sizeof (buffer) - buffer_free;
335   assert (buffer[buffer_size - 1] == ' ');
336   buffer[buffer_size - 1] = '\n';
337
338   pthread_mutex_lock (&lock);
339
340   if (sd < 0)
341   {
342     pthread_mutex_unlock (&lock);
343     return (ENOTCONN);
344   }
345
346   status = swrite (buffer, buffer_size);
347   if (status != 0)
348   {
349     pthread_mutex_unlock (&lock);
350     return (status);
351   }
352
353   status = sread (buffer, sizeof (buffer));
354   if (status < 0)
355   {
356     status = errno;
357     pthread_mutex_unlock (&lock);
358     return (status);
359   }
360   else if (status == 0)
361   {
362     pthread_mutex_unlock (&lock);
363     return (ENODATA);
364   }
365
366   pthread_mutex_unlock (&lock);
367
368   status = atoi (buffer);
369   return (status);
370 } /* }}} int rrdc_update */
371
372 int rrdc_flush (const char *filename) /* {{{ */
373 {
374   char buffer[4096];
375   char *buffer_ptr;
376   size_t buffer_free;
377   size_t buffer_size;
378   int status;
379
380   if (filename == NULL)
381     return (-1);
382
383   memset (buffer, 0, sizeof (buffer));
384   buffer_ptr = &buffer[0];
385   buffer_free = sizeof (buffer);
386
387   status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
388   if (status != 0)
389     return (ENOBUFS);
390
391   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
392   if (status != 0)
393     return (ENOBUFS);
394
395   assert (buffer_free < sizeof (buffer));
396   buffer_size = sizeof (buffer) - buffer_free;
397   assert (buffer[buffer_size - 1] == ' ');
398   buffer[buffer_size - 1] = '\n';
399
400   pthread_mutex_lock (&lock);
401
402   if (sd < 0)
403   {
404     pthread_mutex_unlock (&lock);
405     return (ENOTCONN);
406   }
407
408   status = swrite (buffer, buffer_size);
409   if (status != 0)
410   {
411     pthread_mutex_unlock (&lock);
412     return (status);
413   }
414
415   status = sread (buffer, sizeof (buffer));
416   if (status < 0)
417   {
418     status = errno;
419     pthread_mutex_unlock (&lock);
420     return (status);
421   }
422   else if (status == 0)
423   {
424     pthread_mutex_unlock (&lock);
425     return (ENODATA);
426   }
427
428   pthread_mutex_unlock (&lock);
429
430   status = atoi (buffer);
431   return (status);
432 } /* }}} int rrdc_flush */
433
434 /*
435  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
436  */