fix for the memory leak in info fixes the memory leak in "INFO".
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
122
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127
128 struct listen_socket_s
129 {
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
133
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
137
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
142
143   char *wbuf;
144   ssize_t wbuf_len;
145
146   uint32_t permissions;
147
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
152
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
160
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
163
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
167
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
173
174   char *syntax;
175   char *help;
176 };
177
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182   char *file;
183   char **values;
184   size_t values_num;            /* number of valid pointers */
185   size_t values_alloc;          /* number of allocated pointers */
186   time_t last_flush_time;
187   time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE  (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190   int flags;
191   pthread_cond_t  flushed;
192   cache_item_t *prev;
193   cache_item_t *next;
194 };
195
196 struct callback_flush_data_s
197 {
198   time_t now;
199   time_t abs_timeout;
200   char **keys;
201   size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
204
205 enum queue_side_e
206 {
207   HEAD,
208   TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
211
212 /* describe a set of journal files */
213 typedef struct {
214   char **files;
215   size_t files_num;
216 } journal_set;
217
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
221
222 /*
223  * Variables
224  */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
227
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
230
231 static listen_socket_t default_socket;
232
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
238
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
242
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
249
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
265
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
268
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
277
278 static int opt_no_overwrite = 0; /* default for the daemon */
279
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL;         /* current journal file handle */
287 static long  journal_size = 0;          /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
293
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
296
297 /* 
298  * Functions
299  */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303   state = FLUSHING;
304   pthread_cond_broadcast(&flush_cond);
305   pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
307
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310   sig_common("INT");
311 } /* }}} void sig_int_handler */
312
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315   sig_common("TERM");
316 } /* }}} void sig_term_handler */
317
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320   config_flush_at_shutdown = 1;
321   sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
323
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326   config_flush_at_shutdown = 0;
327   sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
329
330 static void install_signal_handlers(void) /* {{{ */
331 {
332   /* These structures are static, because `sigaction' behaves weird if the are
333    * overwritten.. */
334   static struct sigaction sa_int;
335   static struct sigaction sa_term;
336   static struct sigaction sa_pipe;
337   static struct sigaction sa_usr1;
338   static struct sigaction sa_usr2;
339
340   /* Install signal handlers */
341   memset (&sa_int, 0, sizeof (sa_int));
342   sa_int.sa_handler = sig_int_handler;
343   sigaction (SIGINT, &sa_int, NULL);
344
345   memset (&sa_term, 0, sizeof (sa_term));
346   sa_term.sa_handler = sig_term_handler;
347   sigaction (SIGTERM, &sa_term, NULL);
348
349   memset (&sa_pipe, 0, sizeof (sa_pipe));
350   sa_pipe.sa_handler = SIG_IGN;
351   sigaction (SIGPIPE, &sa_pipe, NULL);
352
353   memset (&sa_pipe, 0, sizeof (sa_usr1));
354   sa_usr1.sa_handler = sig_usr1_handler;
355   sigaction (SIGUSR1, &sa_usr1, NULL);
356
357   memset (&sa_usr2, 0, sizeof (sa_usr2));
358   sa_usr2.sa_handler = sig_usr2_handler;
359   sigaction (SIGUSR2, &sa_usr2, NULL);
360
361 } /* }}} void install_signal_handlers */
362
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365   int fd;
366   const char *file;
367   char *file_copy, *dir;
368
369   file = (config_pid_file != NULL)
370     ? config_pid_file
371     : LOCALSTATEDIR "/run/rrdcached.pid";
372
373   /* dirname may modify its argument */
374   file_copy = strdup(file);
375   if (file_copy == NULL)
376   {
377     fprintf(stderr, "rrdcached: strdup(): %s\n",
378         rrd_strerror(errno));
379     return -1;
380   }
381
382   dir = dirname(file_copy);
383   if (rrd_mkdir_p(dir, 0777) != 0)
384   {
385     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386         dir, rrd_strerror(errno));
387     return -1;
388   }
389
390   free(file_copy);
391
392   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393   if (fd < 0)
394     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395             action, file, rrd_strerror(errno));
396
397   return(fd);
398 } /* }}} static int open_pidfile */
399
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403   int pid_fd;
404   pid_t pid;
405   char pid_str[16];
406
407   pid_fd = open_pidfile("open", O_RDWR);
408   if (pid_fd < 0)
409     return pid_fd;
410
411   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412     return -1;
413
414   pid = atoi(pid_str);
415   if (pid <= 0)
416     return -1;
417
418   /* another running process that we can signal COULD be
419    * a competing rrdcached */
420   if (pid != getpid() && kill(pid, 0) == 0)
421   {
422     fprintf(stderr,
423             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
427
428   lseek(pid_fd, 0, SEEK_SET);
429   if (ftruncate(pid_fd, 0) == -1)
430   {
431     fprintf(stderr,
432             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433     close(pid_fd);
434     return -1;
435   }
436
437   fprintf(stderr,
438           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439           "rrdcached: starting normally.\n", pid);
440
441   return pid_fd;
442 } /* }}} static int check_pidfile */
443
444 static int write_pidfile (int fd) /* {{{ */
445 {
446   pid_t pid;
447   FILE *fh;
448
449   pid = getpid ();
450
451   fh = fdopen (fd, "w");
452   if (fh == NULL)
453   {
454     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455     close(fd);
456     return (-1);
457   }
458
459   fprintf (fh, "%i\n", (int) pid);
460   fclose (fh);
461
462   return (0);
463 } /* }}} int write_pidfile */
464
465 static int remove_pidfile (void) /* {{{ */
466 {
467   char *file;
468   int status;
469
470   file = (config_pid_file != NULL)
471     ? config_pid_file
472     : LOCALSTATEDIR "/run/rrdcached.pid";
473
474   status = unlink (file);
475   if (status == 0)
476     return (0);
477   return (errno);
478 } /* }}} int remove_pidfile */
479
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482   char *eol;
483
484   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485                sock->next_read - sock->next_cmd);
486
487   if (eol == NULL)
488   {
489     /* no commands left, move remainder back to front of rbuf */
490     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491             sock->next_read - sock->next_cmd);
492     sock->next_read -= sock->next_cmd;
493     sock->next_cmd = 0;
494     *len = 0;
495     return NULL;
496   }
497   else
498   {
499     char *cmd = sock->rbuf + sock->next_cmd;
500     *eol = '\0';
501
502     sock->next_cmd = eol - sock->rbuf + 1;
503
504     if (eol > sock->rbuf && *(eol-1) == '\r')
505       *(--eol) = '\0'; /* handle "\r\n" EOL */
506
507     *len = eol - cmd;
508
509     return cmd;
510   }
511
512   /* NOTREACHED */
513   assert(1==0);
514 } /* }}} char *next_cmd */
515
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519   char *new_buf;
520
521   assert(sock != NULL);
522
523   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524   if (new_buf == NULL)
525   {
526     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527     return -1;
528   }
529
530   strncpy(new_buf + sock->wbuf_len, str, len + 1);
531
532   sock->wbuf = new_buf;
533   sock->wbuf_len += len;
534
535   return 0;
536 } /* }}} static int add_to_wbuf */
537
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541   va_list argp;
542   char buffer[CMD_MAX];
543   int len;
544
545   if (JOURNAL_REPLAY(sock)) return 0;
546   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
547
548   va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552   len = vsprintf(buffer, fmt, argp);
553 #endif
554   va_end(argp);
555   if (len < 0)
556   {
557     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558     return -1;
559   }
560
561   return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
563
564 static int count_lines(char *str) /* {{{ */
565 {
566   int lines = 0;
567
568   if (str != NULL)
569   {
570     while ((str = strchr(str, '\n')) != NULL)
571     {
572       ++lines;
573       ++str;
574     }
575   }
576
577   return lines;
578 } /* }}} static int count_lines */
579
580 /* send the response back to the user.
581  * returns 0 on success, -1 on error
582  * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584                           char *fmt, ...) /* {{{ */
585 {
586   va_list argp;
587   char buffer[CMD_MAX];
588   int lines;
589   ssize_t wrote;
590   int rclen, len;
591
592   if (JOURNAL_REPLAY(sock)) return rc;
593
594   if (sock->batch_start)
595   {
596     if (rc == RESP_OK)
597       return rc; /* no response on success during BATCH */
598     lines = sock->batch_cmd;
599   }
600   else if (rc == RESP_OK)
601     lines = count_lines(sock->wbuf);
602   else
603     lines = -1;
604
605   rclen = sprintf(buffer, "%d ", lines);
606   va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610   len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612   va_end(argp);
613   if (len < 0)
614     return -1;
615
616   len += rclen;
617
618   /* append the result to the wbuf, don't write to the user */
619   if (sock->batch_start)
620     return add_to_wbuf(sock, buffer, len);
621
622   /* first write must be complete */
623   if (len != write(sock->fd, buffer, len))
624   {
625     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626     return -1;
627   }
628
629   if (sock->wbuf != NULL && rc == RESP_OK)
630   {
631     wrote = 0;
632     while (wrote < sock->wbuf_len)
633     {
634       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635       if (wb <= 0)
636       {
637         RRDD_LOG(LOG_INFO, "send_response: could not write results");
638         return -1;
639       }
640       wrote += wb;
641     }
642   }
643
644   free(sock->wbuf); sock->wbuf = NULL;
645   sock->wbuf_len = 0;
646
647   return 0;
648 } /* }}} */
649
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652   ci->values = NULL;
653   ci->values_num = 0;
654   ci->values_alloc = 0;
655
656   ci->last_flush_time = when;
657   if (config_write_jitter > 0)
658     ci->last_flush_time += (rrd_random() % config_write_jitter);
659 }
660
661 /* remove_from_queue
662  * remove a "cache_item_t" item from the queue.
663  * must hold 'cache_lock' when calling this
664  */
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666 {
667   if (ci == NULL) return;
668   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669
670   if (ci->prev == NULL)
671     cache_queue_head = ci->next; /* reset head */
672   else
673     ci->prev->next = ci->next;
674
675   if (ci->next == NULL)
676     cache_queue_tail = ci->prev; /* reset the tail */
677   else
678     ci->next->prev = ci->prev;
679
680   ci->next = ci->prev = NULL;
681   ci->flags &= ~CI_FLAGS_IN_QUEUE;
682
683   pthread_mutex_lock (&stats_lock);
684   assert (stats_queue_length > 0);
685   stats_queue_length--;
686   pthread_mutex_unlock (&stats_lock);
687
688 } /* }}} static void remove_from_queue */
689
690 /* free the resources associated with the cache_item_t
691  * must hold cache_lock when calling this function
692  */
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694 {
695   if (ci == NULL) return NULL;
696
697   remove_from_queue(ci);
698
699   for (size_t i=0; i < ci->values_num; i++)
700     free(ci->values[i]);
701
702   free (ci->values);
703   free (ci->file);
704
705   /* in case anyone is waiting */
706   pthread_cond_broadcast(&ci->flushed);
707   pthread_cond_destroy(&ci->flushed);
708
709   free (ci);
710
711   return NULL;
712 } /* }}} static void *free_cache_item */
713
714 /*
715  * enqueue_cache_item:
716  * `cache_lock' must be acquired before calling this function!
717  */
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
719     queue_side_t side)
720 {
721   if (ci == NULL)
722     return (-1);
723
724   if (ci->values_num == 0)
725     return (0);
726
727   if (side == HEAD)
728   {
729     if (cache_queue_head == ci)
730       return 0;
731
732     /* remove if further down in queue */
733     remove_from_queue(ci);
734
735     ci->prev = NULL;
736     ci->next = cache_queue_head;
737     if (ci->next != NULL)
738       ci->next->prev = ci;
739     cache_queue_head = ci;
740
741     if (cache_queue_tail == NULL)
742       cache_queue_tail = cache_queue_head;
743   }
744   else /* (side == TAIL) */
745   {
746     /* We don't move values back in the list.. */
747     if (ci->flags & CI_FLAGS_IN_QUEUE)
748       return (0);
749
750     assert (ci->next == NULL);
751     assert (ci->prev == NULL);
752
753     ci->prev = cache_queue_tail;
754
755     if (cache_queue_tail == NULL)
756       cache_queue_head = ci;
757     else
758       cache_queue_tail->next = ci;
759
760     cache_queue_tail = ci;
761   }
762
763   ci->flags |= CI_FLAGS_IN_QUEUE;
764
765   pthread_cond_signal(&queue_cond);
766   pthread_mutex_lock (&stats_lock);
767   stats_queue_length++;
768   pthread_mutex_unlock (&stats_lock);
769
770   return (0);
771 } /* }}} int enqueue_cache_item */
772
773 /*
774  * tree_callback_flush:
775  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776  * while this is in progress.
777  */
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
779     gpointer data)
780 {
781   cache_item_t *ci;
782   callback_flush_data_t *cfd;
783
784   ci = (cache_item_t *) value;
785   cfd = (callback_flush_data_t *) data;
786
787   if (ci->flags & CI_FLAGS_IN_QUEUE)
788     return FALSE;
789
790   if (ci->values_num > 0
791       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
792   {
793     enqueue_cache_item (ci, TAIL);
794   }
795   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796       && (ci->values_num <= 0))
797   {
798     assert ((char *) key == ci->file);
799     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
800     {
801       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
802       return (FALSE);
803     }
804   }
805
806   return (FALSE);
807 } /* }}} gboolean tree_callback_flush */
808
809 static int flush_old_values (int max_age)
810 {
811   callback_flush_data_t cfd;
812   size_t k;
813
814   memset (&cfd, 0, sizeof (cfd));
815   /* Pass the current time as user data so that we don't need to call
816    * `time' for each node. */
817   cfd.now = time (NULL);
818   cfd.keys = NULL;
819   cfd.keys_num = 0;
820
821   if (max_age > 0)
822     cfd.abs_timeout = cfd.now - max_age;
823   else
824     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825
826   /* `tree_callback_flush' will return the keys of all values that haven't
827    * been touched in the last `config_flush_interval' seconds in `cfd'.
828    * The char*'s in this array point to the same memory as ci->file, so we
829    * don't need to free them separately. */
830   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831
832   for (k = 0; k < cfd.keys_num; k++)
833   {
834     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835     /* should never fail, since we have held the cache_lock
836      * the entire time */
837     assert(status == TRUE);
838   }
839
840   if (cfd.keys != NULL)
841   {
842     free (cfd.keys);
843     cfd.keys = NULL;
844   }
845
846   return (0);
847 } /* int flush_old_values */
848
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850 {
851   struct timeval now;
852   struct timespec next_flush;
853   int status;
854
855   gettimeofday (&now, NULL);
856   next_flush.tv_sec = now.tv_sec + config_flush_interval;
857   next_flush.tv_nsec = 1000 * now.tv_usec;
858
859   pthread_mutex_lock(&cache_lock);
860
861   while (state == RUNNING)
862   {
863     gettimeofday (&now, NULL);
864     if ((now.tv_sec > next_flush.tv_sec)
865         || ((now.tv_sec == next_flush.tv_sec)
866           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
867     {
868       RRDD_LOG(LOG_DEBUG, "flushing old values");
869
870       /* Determine the time of the next cache flush. */
871       next_flush.tv_sec = now.tv_sec + config_flush_interval;
872
873       /* Flush all values that haven't been written in the last
874        * `config_write_interval' seconds. */
875       flush_old_values (config_write_interval);
876
877       /* unlock the cache while we rotate so we don't block incoming
878        * updates if the fsync() blocks on disk I/O */
879       pthread_mutex_unlock(&cache_lock);
880       journal_rotate();
881       pthread_mutex_lock(&cache_lock);
882     }
883
884     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885     if (status != 0 && status != ETIMEDOUT)
886     {
887       RRDD_LOG (LOG_ERR, "flush_thread_main: "
888                 "pthread_cond_timedwait returned %i.", status);
889     }
890   }
891
892   if (config_flush_at_shutdown)
893     flush_old_values (-1); /* flush everything */
894
895   state = SHUTDOWN;
896
897   pthread_mutex_unlock(&cache_lock);
898
899   return NULL;
900 } /* void *flush_thread_main */
901
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903 {
904   pthread_mutex_lock (&cache_lock);
905
906   while (state != SHUTDOWN
907          || (cache_queue_head != NULL && config_flush_at_shutdown))
908   {
909     cache_item_t *ci;
910     char *file;
911     char **values;
912     size_t values_num;
913     int status;
914
915     /* Now, check if there's something to store away. If not, wait until
916      * something comes in. */
917     if (cache_queue_head == NULL)
918     {
919       status = pthread_cond_wait (&queue_cond, &cache_lock);
920       if ((status != 0) && (status != ETIMEDOUT))
921       {
922         RRDD_LOG (LOG_ERR, "queue_thread_main: "
923             "pthread_cond_wait returned %i.", status);
924       }
925     }
926
927     /* Check if a value has arrived. This may be NULL if we timed out or there
928      * was an interrupt such as a signal. */
929     if (cache_queue_head == NULL)
930       continue;
931
932     ci = cache_queue_head;
933
934     /* copy the relevant parts */
935     file = strdup (ci->file);
936     if (file == NULL)
937     {
938       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
939       continue;
940     }
941
942     assert(ci->values != NULL);
943     assert(ci->values_num > 0);
944
945     values = ci->values;
946     values_num = ci->values_num;
947
948     wipe_ci_values(ci, time(NULL));
949     remove_from_queue(ci);
950
951     pthread_mutex_unlock (&cache_lock);
952
953     rrd_clear_error ();
954     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
955     if (status != 0)
956     {
957       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958           "rrd_update_r (%s) failed with status %i. (%s)",
959           file, status, rrd_get_error());
960     }
961
962     journal_write("wrote", file);
963
964     /* Search again in the tree.  It's possible someone issued a "FORGET"
965      * while we were writing the update values. */
966     pthread_mutex_lock(&cache_lock);
967     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
968     if (ci)
969       pthread_cond_broadcast(&ci->flushed);
970     pthread_mutex_unlock(&cache_lock);
971
972     if (status == 0)
973     {
974       pthread_mutex_lock (&stats_lock);
975       stats_updates_written++;
976       stats_data_sets_written += values_num;
977       pthread_mutex_unlock (&stats_lock);
978     }
979
980     rrd_free_ptrs((void ***) &values, &values_num);
981     free(file);
982
983     pthread_mutex_lock (&cache_lock);
984   }
985   pthread_mutex_unlock (&cache_lock);
986
987   return (NULL);
988 } /* }}} void *queue_thread_main */
989
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991     size_t *buffer_size_ret, char **field_ret)
992 {
993   char *buffer;
994   size_t buffer_pos;
995   size_t buffer_size;
996   char *field;
997   size_t field_size;
998   int status;
999
1000   buffer = *buffer_ret;
1001   buffer_pos = 0;
1002   buffer_size = *buffer_size_ret;
1003   field = *buffer_ret;
1004   field_size = 0;
1005
1006   if (buffer_size <= 0)
1007     return (-1);
1008
1009   /* This is ensured by `handle_request'. */
1010   assert (buffer[buffer_size - 1] == '\0');
1011
1012   status = -1;
1013   while (buffer_pos < buffer_size)
1014   {
1015     /* Check for end-of-field or end-of-buffer */
1016     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1017     {
1018       field[field_size] = 0;
1019       field_size++;
1020       buffer_pos++;
1021       status = 0;
1022       break;
1023     }
1024     /* Handle escaped characters. */
1025     else if (buffer[buffer_pos] == '\\')
1026     {
1027       if (buffer_pos >= (buffer_size - 1))
1028         break;
1029       buffer_pos++;
1030       field[field_size] = buffer[buffer_pos];
1031       field_size++;
1032       buffer_pos++;
1033     }
1034     /* Normal operation */ 
1035     else
1036     {
1037       field[field_size] = buffer[buffer_pos];
1038       field_size++;
1039       buffer_pos++;
1040     }
1041   } /* while (buffer_pos < buffer_size) */
1042
1043   if (status != 0)
1044     return (status);
1045
1046   *buffer_ret = buffer + buffer_pos;
1047   *buffer_size_ret = buffer_size - buffer_pos;
1048   *field_ret = field;
1049
1050   return (0);
1051 } /* }}} int buffer_get_field */
1052
1053 /* if we're restricting writes to the base directory,
1054  * check whether the file falls within the dir
1055  * returns 1 if OK, otherwise 0
1056  */
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058 {
1059   assert(file != NULL);
1060
1061   if (!config_write_base_only
1062       || JOURNAL_REPLAY(sock)
1063       || config_base_dir == NULL)
1064     return 1;
1065
1066   if (strstr(file, "../") != NULL) goto err;
1067
1068   /* relative paths without "../" are ok */
1069   if (*file != '/') return 1;
1070
1071   /* file must be of the format base + "/" + <1+ char filename> */
1072   if (strlen(file) < _config_base_dir_len + 2) goto err;
1073   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074   if (*(file + _config_base_dir_len) != '/') goto err;
1075
1076   return 1;
1077
1078 err:
1079   if (sock != NULL && sock->fd >= 0)
1080     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081
1082   return 0;
1083 } /* }}} static int check_file_access */
1084
1085 /* when using a base dir, convert relative paths to absolute paths.
1086  * if necessary, modifies the "filename" pointer to point
1087  * to the new path created in "tmp".  "tmp" is provided
1088  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1089  *
1090  * this allows us to optimize for the expected case (absolute path)
1091  * with a no-op.
1092  */
1093 static void get_abs_path(char **filename, char *tmp)
1094 {
1095   assert(tmp != NULL);
1096   assert(filename != NULL && *filename != NULL);
1097
1098   if (config_base_dir == NULL || **filename == '/')
1099     return;
1100
1101   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1102   *filename = tmp;
1103 } /* }}} static int get_abs_path */
1104
1105 static int flush_file (const char *filename) /* {{{ */
1106 {
1107   cache_item_t *ci;
1108
1109   pthread_mutex_lock (&cache_lock);
1110
1111   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1112   if (ci == NULL)
1113   {
1114     pthread_mutex_unlock (&cache_lock);
1115     return (ENOENT);
1116   }
1117
1118   if (ci->values_num > 0)
1119   {
1120     /* Enqueue at head */
1121     enqueue_cache_item (ci, HEAD);
1122     pthread_cond_wait(&ci->flushed, &cache_lock);
1123   }
1124
1125   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1126    * may have been purged during our cond_wait() */
1127
1128   pthread_mutex_unlock(&cache_lock);
1129
1130   return (0);
1131 } /* }}} int flush_file */
1132
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134 {
1135   char *err = "Syntax error.\n";
1136
1137   if (cmd && cmd->syntax)
1138     err = cmd->syntax;
1139
1140   return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1142
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144 {
1145   uint64_t copy_queue_length;
1146   uint64_t copy_updates_received;
1147   uint64_t copy_flush_received;
1148   uint64_t copy_updates_written;
1149   uint64_t copy_data_sets_written;
1150   uint64_t copy_journal_bytes;
1151   uint64_t copy_journal_rotate;
1152
1153   uint64_t tree_nodes_number;
1154   uint64_t tree_depth;
1155
1156   pthread_mutex_lock (&stats_lock);
1157   copy_queue_length       = stats_queue_length;
1158   copy_updates_received   = stats_updates_received;
1159   copy_flush_received     = stats_flush_received;
1160   copy_updates_written    = stats_updates_written;
1161   copy_data_sets_written  = stats_data_sets_written;
1162   copy_journal_bytes      = stats_journal_bytes;
1163   copy_journal_rotate     = stats_journal_rotate;
1164   pthread_mutex_unlock (&stats_lock);
1165
1166   pthread_mutex_lock (&cache_lock);
1167   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1169   pthread_mutex_unlock (&cache_lock);
1170
1171   add_response_info(sock,
1172                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1173   add_response_info(sock,
1174                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175   add_response_info(sock,
1176                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177   add_response_info(sock,
1178                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179   add_response_info(sock,
1180                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185
1186   send_response(sock, RESP_OK, "Statistics follow\n");
1187
1188   return (0);
1189 } /* }}} int handle_request_stats */
1190
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192 {
1193   char *file, file_tmp[PATH_MAX];
1194   int status;
1195
1196   status = buffer_get_field (&buffer, &buffer_size, &file);
1197   if (status != 0)
1198   {
1199     return syntax_error(sock,cmd);
1200   }
1201   else
1202   {
1203     pthread_mutex_lock(&stats_lock);
1204     stats_flush_received++;
1205     pthread_mutex_unlock(&stats_lock);
1206
1207     get_abs_path(&file, file_tmp);
1208     if (!check_file_access(file, sock)) return 0;
1209
1210     status = flush_file (file);
1211     if (status == 0)
1212       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213     else if (status == ENOENT)
1214     {
1215       /* no file in our tree; see whether it exists at all */
1216       struct stat statbuf;
1217
1218       memset(&statbuf, 0, sizeof(statbuf));
1219       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1221       else
1222         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1223     }
1224     else if (status < 0)
1225       return send_response(sock, RESP_ERR, "Internal error.\n");
1226     else
1227       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1228   }
1229
1230   /* NOTREACHED */
1231   assert(1==0);
1232 } /* }}} int handle_request_flush */
1233
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235 {
1236   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237
1238   pthread_mutex_lock(&cache_lock);
1239   flush_old_values(-1);
1240   pthread_mutex_unlock(&cache_lock);
1241
1242   return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1244
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246 {
1247   int status;
1248   char *file, file_tmp[PATH_MAX];
1249   cache_item_t *ci;
1250
1251   status = buffer_get_field(&buffer, &buffer_size, &file);
1252   if (status != 0)
1253     return syntax_error(sock,cmd);
1254
1255   get_abs_path(&file, file_tmp);
1256
1257   pthread_mutex_lock(&cache_lock);
1258   ci = g_tree_lookup(cache_tree, file);
1259   if (ci == NULL)
1260   {
1261     pthread_mutex_unlock(&cache_lock);
1262     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1263   }
1264
1265   for (size_t i=0; i < ci->values_num; i++)
1266     add_response_info(sock, "%s\n", ci->values[i]);
1267
1268   pthread_mutex_unlock(&cache_lock);
1269   return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1271
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273 {
1274   int status;
1275   gboolean found;
1276   char *file, file_tmp[PATH_MAX];
1277
1278   status = buffer_get_field(&buffer, &buffer_size, &file);
1279   if (status != 0)
1280     return syntax_error(sock,cmd);
1281
1282   get_abs_path(&file, file_tmp);
1283   if (!check_file_access(file, sock)) return 0;
1284
1285   pthread_mutex_lock(&cache_lock);
1286   found = g_tree_remove(cache_tree, file);
1287   pthread_mutex_unlock(&cache_lock);
1288
1289   if (found == TRUE)
1290   {
1291     if (!JOURNAL_REPLAY(sock))
1292       journal_write("forget", file);
1293
1294     return send_response(sock, RESP_OK, "Gone!\n");
1295   }
1296   else
1297     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298
1299   /* NOTREACHED */
1300   assert(1==0);
1301 } /* }}} static int handle_request_forget */
1302
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304 {
1305   cache_item_t *ci;
1306
1307   pthread_mutex_lock(&cache_lock);
1308
1309   ci = cache_queue_head;
1310   while (ci != NULL)
1311   {
1312     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1313     ci = ci->next;
1314   }
1315
1316   pthread_mutex_unlock(&cache_lock);
1317
1318   return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1320
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322 {
1323   char *file, file_tmp[PATH_MAX];
1324   int values_num = 0;
1325   int status;
1326   char orig_buf[CMD_MAX];
1327
1328   cache_item_t *ci;
1329
1330   /* save it for the journal later */
1331   if (!JOURNAL_REPLAY(sock))
1332     strncpy(orig_buf, buffer, buffer_size);
1333
1334   status = buffer_get_field (&buffer, &buffer_size, &file);
1335   if (status != 0)
1336     return syntax_error(sock,cmd);
1337
1338   pthread_mutex_lock(&stats_lock);
1339   stats_updates_received++;
1340   pthread_mutex_unlock(&stats_lock);
1341
1342   get_abs_path(&file, file_tmp);
1343   if (!check_file_access(file, sock)) return 0;
1344
1345   pthread_mutex_lock (&cache_lock);
1346   ci = g_tree_lookup (cache_tree, file);
1347
1348   if (ci == NULL) /* {{{ */
1349   {
1350     struct stat statbuf;
1351     cache_item_t *tmp;
1352
1353     /* don't hold the lock while we setup; stat(2) might block */
1354     pthread_mutex_unlock(&cache_lock);
1355
1356     memset (&statbuf, 0, sizeof (statbuf));
1357     status = stat (file, &statbuf);
1358     if (status != 0)
1359     {
1360       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361
1362       status = errno;
1363       if (status == ENOENT)
1364         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1365       else
1366         return send_response(sock, RESP_ERR,
1367                              "stat failed with error %i.\n", status);
1368     }
1369     if (!S_ISREG (statbuf.st_mode))
1370       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371
1372     if (access(file, R_OK|W_OK) != 0)
1373       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374                            file, rrd_strerror(errno));
1375
1376     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1377     if (ci == NULL)
1378     {
1379       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380
1381       return send_response(sock, RESP_ERR, "malloc failed.\n");
1382     }
1383     memset (ci, 0, sizeof (cache_item_t));
1384
1385     ci->file = strdup (file);
1386     if (ci->file == NULL)
1387     {
1388       free (ci);
1389       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390
1391       return send_response(sock, RESP_ERR, "strdup failed.\n");
1392     }
1393
1394     wipe_ci_values(ci, now);
1395     ci->flags = CI_FLAGS_IN_TREE;
1396     pthread_cond_init(&ci->flushed, NULL);
1397
1398     pthread_mutex_lock(&cache_lock);
1399
1400     /* another UPDATE might have added this entry in the meantime */
1401     tmp = g_tree_lookup (cache_tree, file);
1402     if (tmp == NULL)
1403       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1404     else
1405     {
1406       free_cache_item (ci);
1407       ci = tmp;
1408     }
1409
1410     /* state may have changed while we were unlocked */
1411     if (state == SHUTDOWN)
1412       return -1;
1413   } /* }}} */
1414   assert (ci != NULL);
1415
1416   /* don't re-write updates in replay mode */
1417   if (!JOURNAL_REPLAY(sock))
1418     journal_write("update", orig_buf);
1419
1420   while (buffer_size > 0)
1421   {
1422     char *value;
1423     time_t stamp;
1424     char *eostamp;
1425
1426     status = buffer_get_field (&buffer, &buffer_size, &value);
1427     if (status != 0)
1428     {
1429       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1430       break;
1431     }
1432
1433     /* make sure update time is always moving forward */
1434     stamp = strtol(value, &eostamp, 10);
1435     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436     {
1437       pthread_mutex_unlock(&cache_lock);
1438       return send_response(sock, RESP_ERR,
1439                            "Cannot find timestamp in '%s'!\n", value);
1440     }
1441     else if (stamp <= ci->last_update_stamp)
1442     {
1443       pthread_mutex_unlock(&cache_lock);
1444       return send_response(sock, RESP_ERR,
1445                            "illegal attempt to update using time %ld when last"
1446                            " update time is %ld (minimum one second step)\n",
1447                            stamp, ci->last_update_stamp);
1448     }
1449     else
1450       ci->last_update_stamp = stamp;
1451
1452     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453                               &ci->values_alloc, config_alloc_chunk))
1454     {
1455       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1456       continue;
1457     }
1458
1459     values_num++;
1460   }
1461
1462   if (((now - ci->last_flush_time) >= config_write_interval)
1463       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464       && (ci->values_num > 0))
1465   {
1466     enqueue_cache_item (ci, TAIL);
1467   }
1468
1469   pthread_mutex_unlock (&cache_lock);
1470
1471   if (values_num < 1)
1472     return send_response(sock, RESP_ERR, "No values updated.\n");
1473   else
1474     return send_response(sock, RESP_OK,
1475                          "errors, enqueued %i value(s).\n", values_num);
1476
1477   /* NOTREACHED */
1478   assert(1==0);
1479
1480 } /* }}} int handle_request_update */
1481
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1483 {
1484   char *file, file_tmp[PATH_MAX];
1485   char *cf;
1486
1487   char *start_str;
1488   char *end_str;
1489   time_t start_tm;
1490   time_t end_tm;
1491
1492   unsigned long step;
1493   unsigned long ds_cnt;
1494   char **ds_namv;
1495   rrd_value_t *data;
1496
1497   int status;
1498   unsigned long i;
1499   time_t t;
1500   rrd_value_t *data_ptr;
1501
1502   file = NULL;
1503   cf = NULL;
1504   start_str = NULL;
1505   end_str = NULL;
1506
1507   /* Read the arguments */
1508   do /* while (0) */
1509   {
1510     status = buffer_get_field (&buffer, &buffer_size, &file);
1511     if (status != 0)
1512       break;
1513
1514     status = buffer_get_field (&buffer, &buffer_size, &cf);
1515     if (status != 0)
1516       break;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519     if (status != 0)
1520     {
1521       start_str = NULL;
1522       status = 0;
1523       break;
1524     }
1525
1526     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527     if (status != 0)
1528     {
1529       end_str = NULL;
1530       status = 0;
1531       break;
1532     }
1533   } while (0);
1534
1535   if (status != 0)
1536     return (syntax_error(sock,cmd));
1537
1538   get_abs_path(&file, file_tmp);
1539   if (!check_file_access(file, sock)) return 0;
1540
1541   status = flush_file (file);
1542   if ((status != 0) && (status != ENOENT))
1543     return (send_response (sock, RESP_ERR,
1544           "flush_file (%s) failed with status %i.\n", file, status));
1545
1546   t = time (NULL); /* "now" */
1547
1548   /* Parse start time */
1549   if (start_str != NULL)
1550   {
1551     char *endptr;
1552     long value;
1553
1554     endptr = NULL;
1555     errno = 0;
1556     value = strtol (start_str, &endptr, /* base = */ 0);
1557     if ((endptr == start_str) || (errno != 0))
1558       return (send_response(sock, RESP_ERR,
1559             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1560             start_str));
1561
1562     if (value > 0)
1563       start_tm = (time_t) value;
1564     else
1565       start_tm = (time_t) (t + value);
1566   }
1567   else
1568   {
1569     start_tm = t - 86400;
1570   }
1571
1572   /* Parse end time */
1573   if (end_str != NULL)
1574   {
1575     char *endptr;
1576     long value;
1577
1578     endptr = NULL;
1579     errno = 0;
1580     value = strtol (end_str, &endptr, /* base = */ 0);
1581     if ((endptr == end_str) || (errno != 0))
1582       return (send_response(sock, RESP_ERR,
1583             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1584             end_str));
1585
1586     if (value > 0)
1587       end_tm = (time_t) value;
1588     else
1589       end_tm = (time_t) (t + value);
1590   }
1591   else
1592   {
1593     end_tm = t;
1594   }
1595
1596   step = -1;
1597   ds_cnt = 0;
1598   ds_namv = NULL;
1599   data = NULL;
1600
1601   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602       &ds_cnt, &ds_namv, &data);
1603   if (status != 0)
1604     return (send_response(sock, RESP_ERR,
1605           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1606
1607   add_response_info (sock, "FlushVersion: %lu\n", 1);
1608   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610   add_response_info (sock, "Step: %lu\n", step);
1611   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1612
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614     size_t str_len = strlen (str); \
1615     if ((buffer_fill + str_len) > sizeof (buffer)) \
1616       str_len = sizeof (buffer) - buffer_fill; \
1617     if (str_len > 0) { \
1618       strncpy (buffer + buffer_fill, str, str_len); \
1619       buffer_fill += str_len; \
1620       assert (buffer_fill <= sizeof (buffer)); \
1621       if (buffer_fill == sizeof (buffer)) \
1622         buffer[buffer_fill - 1] = 0; \
1623       else \
1624         buffer[buffer_fill] = 0; \
1625     } \
1626   } while (0)
1627
1628   { /* Add list of DS names */
1629     char linebuf[1024];
1630     size_t linebuf_fill;
1631
1632     memset (linebuf, 0, sizeof (linebuf));
1633     linebuf_fill = 0;
1634     for (i = 0; i < ds_cnt; i++)
1635     {
1636       if (i > 0)
1637         SSTRCAT (linebuf, " ", linebuf_fill);
1638       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639       rrd_freemem(ds_namv[i]);
1640     }
1641     rrd_freemem(ds_namv);
1642     add_response_info (sock, "DSName: %s\n", linebuf);
1643   }
1644
1645   /* Add the actual data */
1646   assert (step > 0);
1647   data_ptr = data;
1648   for (t = start_tm + step; t <= end_tm; t += step)
1649   {
1650     char linebuf[1024];
1651     size_t linebuf_fill;
1652     char tmp[128];
1653
1654     memset (linebuf, 0, sizeof (linebuf));
1655     linebuf_fill = 0;
1656     for (i = 0; i < ds_cnt; i++)
1657     {
1658       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659       tmp[sizeof (tmp) - 1] = 0;
1660       SSTRCAT (linebuf, tmp, linebuf_fill);
1661
1662       data_ptr++;
1663     }
1664
1665     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1666   } /* for (t) */
1667   rrd_freemem(data);
1668
1669   return (send_response (sock, RESP_OK, "Success\n"));
1670 #undef SSTRCAT
1671 } /* }}} int handle_request_fetch */
1672
1673 /* we came across a "WROTE" entry during journal replay.
1674  * throw away any values that we have accumulated for this file
1675  */
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1677 {
1678   cache_item_t *ci;
1679   const char *file = buffer;
1680
1681   pthread_mutex_lock(&cache_lock);
1682
1683   ci = g_tree_lookup(cache_tree, file);
1684   if (ci == NULL)
1685   {
1686     pthread_mutex_unlock(&cache_lock);
1687     return (0);
1688   }
1689
1690   if (ci->values)
1691     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1692
1693   wipe_ci_values(ci, now);
1694   remove_from_queue(ci);
1695
1696   pthread_mutex_unlock(&cache_lock);
1697   return (0);
1698 } /* }}} int handle_request_wrote */
1699
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1701 {
1702   char *file, file_tmp[PATH_MAX];
1703   int status;
1704   rrd_info_t *info;
1705
1706   /* obtain filename */
1707   status = buffer_get_field(&buffer, &buffer_size, &file);
1708   if (status != 0)
1709     return syntax_error(sock,cmd);
1710   /* get full pathname */
1711   get_abs_path(&file, file_tmp);
1712   if (!check_file_access(file, sock)) {
1713     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1714   }
1715   /* get data */
1716   rrd_clear_error ();
1717   info = rrd_info_r(file);
1718   if(!info) {
1719     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1720   }
1721   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1722       switch (data->type) {
1723       case RD_I_VAL:
1724           if (isnan(data->value.u_val))
1725               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1726           else
1727               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1728           break;
1729       case RD_I_CNT:
1730           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1731           break;
1732       case RD_I_INT:
1733           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1734           break;
1735       case RD_I_STR:
1736           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1737           break;
1738       case RD_I_BLO:
1739           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1740           break;
1741       }
1742   }
1743
1744   rrd_info_free(info);
1745
1746   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1747 } /* }}} static int handle_request_info  */
1748
1749 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1750 {
1751   char *i, *file, file_tmp[PATH_MAX];
1752   int status;
1753   int idx;
1754   time_t t;
1755
1756   /* obtain filename */
1757   status = buffer_get_field(&buffer, &buffer_size, &file);
1758   if (status != 0)
1759     return syntax_error(sock,cmd);
1760   /* get full pathname */
1761   get_abs_path(&file, file_tmp);
1762   if (!check_file_access(file, sock)) {
1763     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1764   }
1765
1766   status = buffer_get_field(&buffer, &buffer_size, &i);
1767   if (status != 0)
1768     return syntax_error(sock,cmd);
1769   idx = atoi(i);
1770   if(idx<0) { 
1771     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1772   }
1773
1774   /* get data */
1775   rrd_clear_error ();
1776   t = rrd_first_r(file,idx);
1777   if(t<1) {
1778     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1779   }
1780   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1781 } /* }}} static int handle_request_first  */
1782
1783
1784 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1785 {
1786   char *file, file_tmp[PATH_MAX];
1787   int status;
1788   time_t t, from_file, step;
1789   rrd_file_t * rrd_file;
1790   cache_item_t * ci;
1791   rrd_t rrd; 
1792
1793   /* obtain filename */
1794   status = buffer_get_field(&buffer, &buffer_size, &file);
1795   if (status != 0)
1796     return syntax_error(sock,cmd);
1797   /* get full pathname */
1798   get_abs_path(&file, file_tmp);
1799   if (!check_file_access(file, sock)) {
1800     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1801   }
1802   rrd_clear_error();
1803   rrd_init(&rrd);
1804   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1805   if(!rrd_file) {
1806     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1807   }
1808   from_file = rrd.live_head->last_up;
1809   step = rrd.stat_head->pdp_step;
1810   rrd_close(rrd_file);
1811   pthread_mutex_lock(&cache_lock);
1812   ci = g_tree_lookup(cache_tree, file);
1813   if (ci)
1814     t = ci->last_update_stamp;
1815   else
1816     t = from_file;
1817   pthread_mutex_unlock(&cache_lock);
1818   t -= t % step;
1819   rrd_free(&rrd);
1820   if(t<1) {
1821     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1822   }
1823   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1824 } /* }}} static int handle_request_last  */
1825
1826 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1827 {
1828   char *file, file_tmp[PATH_MAX];
1829   char *tok;
1830   int ac = 0;
1831   char *av[128];
1832   int status;
1833   unsigned long step = 300;
1834   time_t last_up = time(NULL)-10;
1835   int no_overwrite = opt_no_overwrite;
1836
1837
1838   /* obtain filename */
1839   status = buffer_get_field(&buffer, &buffer_size, &file);
1840   if (status != 0)
1841     return syntax_error(sock,cmd);
1842   /* get full pathname */
1843   get_abs_path(&file, file_tmp);
1844   if (!check_file_access(file, sock)) {
1845     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1846   }
1847   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1848
1849   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1850     if( ! strncmp(tok,"-b",2) ) {
1851       status = buffer_get_field(&buffer, &buffer_size, &tok );
1852       if (status != 0) return syntax_error(sock,cmd);
1853       last_up = (time_t) atol(tok);
1854       continue;
1855     }
1856     if( ! strncmp(tok,"-s",2) ) {
1857       status = buffer_get_field(&buffer, &buffer_size, &tok );
1858       if (status != 0) return syntax_error(sock,cmd);
1859       step = atol(tok);
1860       continue;
1861     }
1862     if( ! strncmp(tok,"-O",2) ) {
1863       no_overwrite = 1;
1864       continue;
1865     }
1866     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1867     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1868     return syntax_error(sock,cmd);
1869   }
1870   if(step<1) {
1871     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1872   }
1873   if (last_up < 3600 * 24 * 365 * 10) {
1874     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1875   }
1876
1877   rrd_clear_error ();
1878   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1879
1880   if(!status) {
1881     return send_response(sock, RESP_OK, "RRD created OK\n");
1882   }
1883   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1884 } /* }}} static int handle_request_create  */
1885
1886 /* start "BATCH" processing */
1887 static int batch_start (HANDLER_PROTO) /* {{{ */
1888 {
1889   int status;
1890   if (sock->batch_start)
1891     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1892
1893   status = send_response(sock, RESP_OK,
1894                          "Go ahead.  End with dot '.' on its own line.\n");
1895   sock->batch_start = time(NULL);
1896   sock->batch_cmd = 0;
1897
1898   return status;
1899 } /* }}} static int batch_start */
1900
1901 /* finish "BATCH" processing and return results to the client */
1902 static int batch_done (HANDLER_PROTO) /* {{{ */
1903 {
1904   assert(sock->batch_start);
1905   sock->batch_start = 0;
1906   sock->batch_cmd  = 0;
1907   return send_response(sock, RESP_OK, "errors\n");
1908 } /* }}} static int batch_done */
1909
1910 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1911 {
1912   return -1;
1913 } /* }}} static int handle_request_quit */
1914
1915 static command_t list_of_commands[] = { /* {{{ */
1916   {
1917     "UPDATE",
1918     handle_request_update,
1919     CMD_CONTEXT_ANY,
1920     "UPDATE <filename> <values> [<values> ...]\n"
1921     ,
1922     "Adds the given file to the internal cache if it is not yet known and\n"
1923     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1924     "for details.\n"
1925     "\n"
1926     "Each <values> has the following form:\n"
1927     "  <values> = <time>:<value>[:<value>[...]]\n"
1928     "See the rrdupdate(1) manpage for details.\n"
1929   },
1930   {
1931     "WROTE",
1932     handle_request_wrote,
1933     CMD_CONTEXT_JOURNAL,
1934     NULL,
1935     NULL
1936   },
1937   {
1938     "FLUSH",
1939     handle_request_flush,
1940     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1941     "FLUSH <filename>\n"
1942     ,
1943     "Adds the given filename to the head of the update queue and returns\n"
1944     "after it has been dequeued.\n"
1945   },
1946   {
1947     "FLUSHALL",
1948     handle_request_flushall,
1949     CMD_CONTEXT_CLIENT,
1950     "FLUSHALL\n"
1951     ,
1952     "Triggers writing of all pending updates.  Returns immediately.\n"
1953   },
1954   {
1955     "PENDING",
1956     handle_request_pending,
1957     CMD_CONTEXT_CLIENT,
1958     "PENDING <filename>\n"
1959     ,
1960     "Shows any 'pending' updates for a file, in order.\n"
1961     "The updates shown have not yet been written to the underlying RRD file.\n"
1962   },
1963   {
1964     "FORGET",
1965     handle_request_forget,
1966     CMD_CONTEXT_ANY,
1967     "FORGET <filename>\n"
1968     ,
1969     "Removes the file completely from the cache.\n"
1970     "Any pending updates for the file will be lost.\n"
1971   },
1972   {
1973     "QUEUE",
1974     handle_request_queue,
1975     CMD_CONTEXT_CLIENT,
1976     "QUEUE\n"
1977     ,
1978         "Shows all files in the output queue.\n"
1979     "The output is zero or more lines in the following format:\n"
1980     "(where <num_vals> is the number of values to be written)\n"
1981     "\n"
1982     "<num_vals> <filename>\n"
1983   },
1984   {
1985     "STATS",
1986     handle_request_stats,
1987     CMD_CONTEXT_CLIENT,
1988     "STATS\n"
1989     ,
1990     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1991     "a description of the values.\n"
1992   },
1993   {
1994     "HELP",
1995     handle_request_help,
1996     CMD_CONTEXT_CLIENT,
1997     "HELP [<command>]\n",
1998     NULL, /* special! */
1999   },
2000   {
2001     "BATCH",
2002     batch_start,
2003     CMD_CONTEXT_CLIENT,
2004     "BATCH\n"
2005     ,
2006     "The 'BATCH' command permits the client to initiate a bulk load\n"
2007     "   of commands to rrdcached.\n"
2008     "\n"
2009     "Usage:\n"
2010     "\n"
2011     "    client: BATCH\n"
2012     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2013     "    client: command #1\n"
2014     "    client: command #2\n"
2015     "    client: ... and so on\n"
2016     "    client: .\n"
2017     "    server: 2 errors\n"
2018     "    server: 7 message for command #7\n"
2019     "    server: 9 message for command #9\n"
2020     "\n"
2021     "For more information, consult the rrdcached(1) documentation.\n"
2022   },
2023   {
2024     ".",   /* BATCH terminator */
2025     batch_done,
2026     CMD_CONTEXT_BATCH,
2027     NULL,
2028     NULL
2029   },
2030   {
2031     "FETCH",
2032     handle_request_fetch,
2033     CMD_CONTEXT_CLIENT,
2034     "FETCH <file> <CF> [<start> [<end>]]\n"
2035     ,
2036     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2037   },
2038   {
2039     "INFO",
2040     handle_request_info,
2041     CMD_CONTEXT_CLIENT,
2042     "INFO <filename>\n",
2043     "The INFO command retrieves information about a specified RRD file.\n"
2044     "This is returned in standard rrdinfo format, a sequence of lines\n"
2045     "with the format <keyname> = <value>\n"
2046     "Note that this is the data as of the last update of the RRD file itself,\n"
2047     "not the last time data was received via rrdcached, so there may be pending\n"
2048     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2049   },
2050   {
2051     "FIRST",
2052     handle_request_first,
2053     CMD_CONTEXT_CLIENT,
2054     "FIRST <filename> <rra index>\n",
2055     "The FIRST command retrieves the first data time for a specified RRA in\n"
2056     "an RRD file.\n"
2057   },
2058   {
2059     "LAST",
2060     handle_request_last,
2061     CMD_CONTEXT_CLIENT,
2062     "LAST <filename>\n",
2063     "The LAST command retrieves the last update time for a specified RRD file.\n"
2064     "Note that this is the time of the last update of the RRD file itself, not\n"
2065     "the last time data was received via rrdcached, so there may be pending\n"
2066     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2067   },
2068   {
2069     "CREATE",
2070     handle_request_create,
2071     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2072     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2073     "The CREATE command will create an RRD file, overwriting any existing file\n"
2074     "unless the -O option is given or rrdcached was started with the -O option.\n"
2075     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2076     "not acceptable) and the step is in seconds (default is 300).\n"
2077     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2078   },
2079   {
2080     "QUIT",
2081     handle_request_quit,
2082     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2083     "QUIT\n"
2084     ,
2085     "Disconnect from rrdcached.\n"
2086   }
2087 }; /* }}} command_t list_of_commands[] */
2088 static size_t list_of_commands_len = sizeof (list_of_commands)
2089   / sizeof (list_of_commands[0]);
2090
2091 static command_t *find_command(char *cmd)
2092 {
2093   size_t i;
2094
2095   for (i = 0; i < list_of_commands_len; i++)
2096     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2097       return (&list_of_commands[i]);
2098   return NULL;
2099 }
2100
2101 /* We currently use the index in the `list_of_commands' array as a bit position
2102  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2103  * outside these functions so that switching to a more elegant storage method
2104  * is easily possible. */
2105 static ssize_t find_command_index (const char *cmd) /* {{{ */
2106 {
2107   size_t i;
2108
2109   for (i = 0; i < list_of_commands_len; i++)
2110     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2111       return ((ssize_t) i);
2112   return (-1);
2113 } /* }}} ssize_t find_command_index */
2114
2115 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2116     const char *cmd)
2117 {
2118   ssize_t i;
2119
2120   if (JOURNAL_REPLAY(sock))
2121     return (1);
2122
2123   if (cmd == NULL)
2124     return (-1);
2125
2126   if ((strcasecmp ("QUIT", cmd) == 0)
2127       || (strcasecmp ("HELP", cmd) == 0))
2128     return (1);
2129   else if (strcmp (".", cmd) == 0)
2130     cmd = "BATCH";
2131
2132   i = find_command_index (cmd);
2133   if (i < 0)
2134     return (-1);
2135   assert (i < 32);
2136
2137   if ((sock->permissions & (1 << i)) != 0)
2138     return (1);
2139   return (0);
2140 } /* }}} int socket_permission_check */
2141
2142 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2143     const char *cmd)
2144 {
2145   ssize_t i;
2146
2147   i = find_command_index (cmd);
2148   if (i < 0)
2149     return (-1);
2150   assert (i < 32);
2151
2152   sock->permissions |= (1 << i);
2153   return (0);
2154 } /* }}} int socket_permission_add */
2155
2156 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2157 {
2158   sock->permissions = 0;
2159 } /* }}} socket_permission_clear */
2160
2161 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2162     listen_socket_t *src)
2163 {
2164   dest->permissions = src->permissions;
2165 } /* }}} socket_permission_copy */
2166
2167 /* check whether commands are received in the expected context */
2168 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2169 {
2170   if (JOURNAL_REPLAY(sock))
2171     return (cmd->context & CMD_CONTEXT_JOURNAL);
2172   else if (sock->batch_start)
2173     return (cmd->context & CMD_CONTEXT_BATCH);
2174   else
2175     return (cmd->context & CMD_CONTEXT_CLIENT);
2176
2177   /* NOTREACHED */
2178   assert(1==0);
2179 }
2180
2181 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2182 {
2183   int status;
2184   char *cmd_str;
2185   char *resp_txt;
2186   command_t *help = NULL;
2187
2188   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2189   if (status == 0)
2190     help = find_command(cmd_str);
2191
2192   if (help && (help->syntax || help->help))
2193   {
2194     char tmp[CMD_MAX];
2195
2196     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2197     resp_txt = tmp;
2198
2199     if (help->syntax)
2200       add_response_info(sock, "Usage: %s\n", help->syntax);
2201
2202     if (help->help)
2203       add_response_info(sock, "%s\n", help->help);
2204   }
2205   else
2206   {
2207     size_t i;
2208
2209     resp_txt = "Command overview\n";
2210
2211     for (i = 0; i < list_of_commands_len; i++)
2212     {
2213       if (list_of_commands[i].syntax == NULL)
2214         continue;
2215       add_response_info (sock, "%s", list_of_commands[i].syntax);
2216     }
2217   }
2218
2219   return send_response(sock, RESP_OK, resp_txt);
2220 } /* }}} int handle_request_help */
2221
2222 static int handle_request (DISPATCH_PROTO) /* {{{ */
2223 {
2224   char *buffer_ptr = buffer;
2225   char *cmd_str = NULL;
2226   command_t *cmd = NULL;
2227   int status;
2228
2229   assert (buffer[buffer_size - 1] == '\0');
2230
2231   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2232   if (status != 0)
2233   {
2234     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2235     return (-1);
2236   }
2237
2238   if (sock != NULL && sock->batch_start)
2239     sock->batch_cmd++;
2240
2241   cmd = find_command(cmd_str);
2242   if (!cmd)
2243     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2244
2245   if (!socket_permission_check (sock, cmd->cmd))
2246     return send_response(sock, RESP_ERR, "Permission denied.\n");
2247
2248   if (!command_check_context(sock, cmd))
2249     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2250
2251   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2252 } /* }}} int handle_request */
2253
2254 static void journal_set_free (journal_set *js) /* {{{ */
2255 {
2256   if (js == NULL)
2257     return;
2258
2259   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2260
2261   free(js);
2262 } /* }}} journal_set_free */
2263
2264 static void journal_set_remove (journal_set *js) /* {{{ */
2265 {
2266   if (js == NULL)
2267     return;
2268
2269   for (uint i=0; i < js->files_num; i++)
2270   {
2271     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2272     unlink(js->files[i]);
2273   }
2274 } /* }}} journal_set_remove */
2275
2276 /* close current journal file handle.
2277  * MUST hold journal_lock before calling */
2278 static void journal_close(void) /* {{{ */
2279 {
2280   if (journal_fh != NULL)
2281   {
2282     if (fclose(journal_fh) != 0)
2283       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2284   }
2285
2286   journal_fh = NULL;
2287   journal_size = 0;
2288 } /* }}} journal_close */
2289
2290 /* MUST hold journal_lock before calling */
2291 static void journal_new_file(void) /* {{{ */
2292 {
2293   struct timeval now;
2294   int  new_fd;
2295   char new_file[PATH_MAX + 1];
2296
2297   assert(journal_dir != NULL);
2298   assert(journal_cur != NULL);
2299
2300   journal_close();
2301
2302   gettimeofday(&now, NULL);
2303   /* this format assures that the files sort in strcmp() order */
2304   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2305            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2306
2307   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2308                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2309   if (new_fd < 0)
2310     goto error;
2311
2312   journal_fh = fdopen(new_fd, "a");
2313   if (journal_fh == NULL)
2314     goto error;
2315
2316   journal_size = ftell(journal_fh);
2317   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2318
2319   /* record the file in the journal set */
2320   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2321
2322   return;
2323
2324 error:
2325   RRDD_LOG(LOG_CRIT,
2326            "JOURNALING DISABLED: Error while trying to create %s : %s",
2327            new_file, rrd_strerror(errno));
2328   RRDD_LOG(LOG_CRIT,
2329            "JOURNALING DISABLED: All values will be flushed at shutdown");
2330
2331   close(new_fd);
2332   config_flush_at_shutdown = 1;
2333
2334 } /* }}} journal_new_file */
2335
2336 /* MUST NOT hold journal_lock before calling this */
2337 static void journal_rotate(void) /* {{{ */
2338 {
2339   journal_set *old_js = NULL;
2340
2341   if (journal_dir == NULL)
2342     return;
2343
2344   RRDD_LOG(LOG_DEBUG, "rotating journals");
2345
2346   pthread_mutex_lock(&stats_lock);
2347   ++stats_journal_rotate;
2348   pthread_mutex_unlock(&stats_lock);
2349
2350   pthread_mutex_lock(&journal_lock);
2351
2352   journal_close();
2353
2354   /* rotate the journal sets */
2355   old_js = journal_old;
2356   journal_old = journal_cur;
2357   journal_cur = calloc(1, sizeof(journal_set));
2358
2359   if (journal_cur != NULL)
2360     journal_new_file();
2361   else
2362     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2363
2364   pthread_mutex_unlock(&journal_lock);
2365
2366   journal_set_remove(old_js);
2367   journal_set_free  (old_js);
2368
2369 } /* }}} static void journal_rotate */
2370
2371 /* MUST hold journal_lock when calling */
2372 static void journal_done(void) /* {{{ */
2373 {
2374   if (journal_cur == NULL)
2375     return;
2376
2377   journal_close();
2378
2379   if (config_flush_at_shutdown)
2380   {
2381     RRDD_LOG(LOG_INFO, "removing journals");
2382     journal_set_remove(journal_old);
2383     journal_set_remove(journal_cur);
2384   }
2385   else
2386   {
2387     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2388              "journals will be used at next startup");
2389   }
2390
2391   journal_set_free(journal_cur);
2392   journal_set_free(journal_old);
2393   free(journal_dir);
2394
2395 } /* }}} static void journal_done */
2396
2397 static int journal_write(char *cmd, char *args) /* {{{ */
2398 {
2399   int chars;
2400
2401   if (journal_fh == NULL)
2402     return 0;
2403
2404   pthread_mutex_lock(&journal_lock);
2405   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2406   journal_size += chars;
2407
2408   if (journal_size > JOURNAL_MAX)
2409     journal_new_file();
2410
2411   pthread_mutex_unlock(&journal_lock);
2412
2413   if (chars > 0)
2414   {
2415     pthread_mutex_lock(&stats_lock);
2416     stats_journal_bytes += chars;
2417     pthread_mutex_unlock(&stats_lock);
2418   }
2419
2420   return chars;
2421 } /* }}} static int journal_write */
2422
2423 static int journal_replay (const char *file) /* {{{ */
2424 {
2425   FILE *fh;
2426   int entry_cnt = 0;
2427   int fail_cnt = 0;
2428   uint64_t line = 0;
2429   char entry[CMD_MAX];
2430   time_t now;
2431
2432   if (file == NULL) return 0;
2433
2434   {
2435     char *reason = "unknown error";
2436     int status = 0;
2437     struct stat statbuf;
2438
2439     memset(&statbuf, 0, sizeof(statbuf));
2440     if (stat(file, &statbuf) != 0)
2441     {
2442       reason = "stat error";
2443       status = errno;
2444     }
2445     else if (!S_ISREG(statbuf.st_mode))
2446     {
2447       reason = "not a regular file";
2448       status = EPERM;
2449     }
2450     if (statbuf.st_uid != daemon_uid)
2451     {
2452       reason = "not owned by daemon user";
2453       status = EACCES;
2454     }
2455     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2456     {
2457       reason = "must not be user/group writable";
2458       status = EACCES;
2459     }
2460
2461     if (status != 0)
2462     {
2463       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2464                file, rrd_strerror(status), reason);
2465       return 0;
2466     }
2467   }
2468
2469   fh = fopen(file, "r");
2470   if (fh == NULL)
2471   {
2472     if (errno != ENOENT)
2473       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2474                file, rrd_strerror(errno));
2475     return 0;
2476   }
2477   else
2478     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2479
2480   now = time(NULL);
2481
2482   while(!feof(fh))
2483   {
2484     size_t entry_len;
2485
2486     ++line;
2487     if (fgets(entry, sizeof(entry), fh) == NULL)
2488       break;
2489     entry_len = strlen(entry);
2490
2491     /* check \n termination in case journal writing crashed mid-line */
2492     if (entry_len == 0)
2493       continue;
2494     else if (entry[entry_len - 1] != '\n')
2495     {
2496       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2497       ++fail_cnt;
2498       continue;
2499     }
2500
2501     entry[entry_len - 1] = '\0';
2502
2503     if (handle_request(NULL, now, entry, entry_len) == 0)
2504       ++entry_cnt;
2505     else
2506       ++fail_cnt;
2507   }
2508
2509   fclose(fh);
2510
2511   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2512            entry_cnt, fail_cnt);
2513
2514   return entry_cnt > 0 ? 1 : 0;
2515 } /* }}} static int journal_replay */
2516
2517 static int journal_sort(const void *v1, const void *v2)
2518 {
2519   char **jn1 = (char **) v1;
2520   char **jn2 = (char **) v2;
2521
2522   return strcmp(*jn1,*jn2);
2523 }
2524
2525 static void journal_init(void) /* {{{ */
2526 {
2527   int had_journal = 0;
2528   DIR *dir;
2529   struct dirent *dent;
2530   char path[PATH_MAX+1];
2531
2532   if (journal_dir == NULL) return;
2533
2534   pthread_mutex_lock(&journal_lock);
2535
2536   journal_cur = calloc(1, sizeof(journal_set));
2537   if (journal_cur == NULL)
2538   {
2539     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2540     return;
2541   }
2542
2543   RRDD_LOG(LOG_INFO, "checking for journal files");
2544
2545   /* Handle old journal files during transition.  This gives them the
2546    * correct sort order.  TODO: remove after first release
2547    */
2548   {
2549     char old_path[PATH_MAX+1];
2550     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2551     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2552     rename(old_path, path);
2553
2554     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2555     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2556     rename(old_path, path);
2557   }
2558
2559   dir = opendir(journal_dir);
2560   if (!dir) {
2561     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2562     return;
2563   }
2564   while ((dent = readdir(dir)) != NULL)
2565   {
2566     /* looks like a journal file? */
2567     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2568       continue;
2569
2570     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2571
2572     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2573     {
2574       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2575                dent->d_name);
2576       break;
2577     }
2578   }
2579   closedir(dir);
2580
2581   qsort(journal_cur->files, journal_cur->files_num,
2582         sizeof(journal_cur->files[0]), journal_sort);
2583
2584   for (uint i=0; i < journal_cur->files_num; i++)
2585     had_journal += journal_replay(journal_cur->files[i]);
2586
2587   journal_new_file();
2588
2589   /* it must have been a crash.  start a flush */
2590   if (had_journal && config_flush_at_shutdown)
2591     flush_old_values(-1);
2592
2593   pthread_mutex_unlock(&journal_lock);
2594
2595   RRDD_LOG(LOG_INFO, "journal processing complete");
2596
2597 } /* }}} static void journal_init */
2598
2599 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2600 {
2601   assert(sock != NULL);
2602
2603   free(sock->rbuf);  sock->rbuf = NULL;
2604   free(sock->wbuf);  sock->wbuf = NULL;
2605   free(sock);
2606 } /* }}} void free_listen_socket */
2607
2608 static void close_connection(listen_socket_t *sock) /* {{{ */
2609 {
2610   if (sock->fd >= 0)
2611   {
2612     close(sock->fd);
2613     sock->fd = -1;
2614   }
2615
2616   free_listen_socket(sock);
2617
2618 } /* }}} void close_connection */
2619
2620 static void *connection_thread_main (void *args) /* {{{ */
2621 {
2622   listen_socket_t *sock;
2623   int fd;
2624
2625   sock = (listen_socket_t *) args;
2626   fd = sock->fd;
2627
2628   /* init read buffers */
2629   sock->next_read = sock->next_cmd = 0;
2630   sock->rbuf = malloc(RBUF_SIZE);
2631   if (sock->rbuf == NULL)
2632   {
2633     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2634     close_connection(sock);
2635     return NULL;
2636   }
2637
2638   pthread_mutex_lock (&connection_threads_lock);
2639   connection_threads_num++;
2640   pthread_mutex_unlock (&connection_threads_lock);
2641
2642   while (state == RUNNING)
2643   {
2644     char *cmd;
2645     ssize_t cmd_len;
2646     ssize_t rbytes;
2647     time_t now;
2648
2649     struct pollfd pollfd;
2650     int status;
2651
2652     pollfd.fd = fd;
2653     pollfd.events = POLLIN | POLLPRI;
2654     pollfd.revents = 0;
2655
2656     status = poll (&pollfd, 1, /* timeout = */ 500);
2657     if (state != RUNNING)
2658       break;
2659     else if (status == 0) /* timeout */
2660       continue;
2661     else if (status < 0) /* error */
2662     {
2663       status = errno;
2664       if (status != EINTR)
2665         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2666       continue;
2667     }
2668
2669     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2670       break;
2671     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2672     {
2673       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2674           "poll(2) returned something unexpected: %#04hx",
2675           pollfd.revents);
2676       break;
2677     }
2678
2679     rbytes = read(fd, sock->rbuf + sock->next_read,
2680                   RBUF_SIZE - sock->next_read);
2681     if (rbytes < 0)
2682     {
2683       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2684       break;
2685     }
2686     else if (rbytes == 0)
2687       break; /* eof */
2688
2689     sock->next_read += rbytes;
2690
2691     if (sock->batch_start)
2692       now = sock->batch_start;
2693     else
2694       now = time(NULL);
2695
2696     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2697     {
2698       status = handle_request (sock, now, cmd, cmd_len+1);
2699       if (status != 0)
2700         goto out_close;
2701     }
2702   }
2703
2704 out_close:
2705   close_connection(sock);
2706
2707   /* Remove this thread from the connection threads list */
2708   pthread_mutex_lock (&connection_threads_lock);
2709   connection_threads_num--;
2710   if (connection_threads_num <= 0)
2711     pthread_cond_broadcast(&connection_threads_done);
2712   pthread_mutex_unlock (&connection_threads_lock);
2713
2714   return (NULL);
2715 } /* }}} void *connection_thread_main */
2716
2717 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2718 {
2719   int fd;
2720   struct sockaddr_un sa;
2721   listen_socket_t *temp;
2722   int status;
2723   const char *path;
2724   char *path_copy, *dir;
2725
2726   path = sock->addr;
2727   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2728     path += strlen("unix:");
2729
2730   /* dirname may modify its argument */
2731   path_copy = strdup(path);
2732   if (path_copy == NULL)
2733   {
2734     fprintf(stderr, "rrdcached: strdup(): %s\n",
2735         rrd_strerror(errno));
2736     return (-1);
2737   }
2738
2739   dir = dirname(path_copy);
2740   if (rrd_mkdir_p(dir, 0777) != 0)
2741   {
2742     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2743         dir, rrd_strerror(errno));
2744     return (-1);
2745   }
2746
2747   free(path_copy);
2748
2749   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2750       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2751   if (temp == NULL)
2752   {
2753     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2754     return (-1);
2755   }
2756   listen_fds = temp;
2757   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2758
2759   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2760   if (fd < 0)
2761   {
2762     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2763              rrd_strerror(errno));
2764     return (-1);
2765   }
2766
2767   memset (&sa, 0, sizeof (sa));
2768   sa.sun_family = AF_UNIX;
2769   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2770
2771   /* if we've gotten this far, we own the pid file.  any daemon started
2772    * with the same args must not be alive.  therefore, ensure that we can
2773    * create the socket...
2774    */
2775   unlink(path);
2776
2777   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2778   if (status != 0)
2779   {
2780     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2781              path, rrd_strerror(errno));
2782     close (fd);
2783     return (-1);
2784   }
2785
2786   /* tweak the sockets group ownership */
2787   if (sock->socket_group != (gid_t)-1)
2788   {
2789     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2790          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2791     {
2792       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2793     }
2794   }
2795
2796   if (sock->socket_permissions != (mode_t)-1)
2797   {
2798     if (chmod(path, sock->socket_permissions) != 0)
2799       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2800           (unsigned int)sock->socket_permissions, strerror(errno));
2801   }
2802
2803   status = listen (fd, /* backlog = */ 10);
2804   if (status != 0)
2805   {
2806     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2807              path, rrd_strerror(errno));
2808     close (fd);
2809     unlink (path);
2810     return (-1);
2811   }
2812
2813   listen_fds[listen_fds_num].fd = fd;
2814   listen_fds[listen_fds_num].family = PF_UNIX;
2815   strncpy(listen_fds[listen_fds_num].addr, path,
2816           sizeof (listen_fds[listen_fds_num].addr) - 1);
2817   listen_fds_num++;
2818
2819   return (0);
2820 } /* }}} int open_listen_socket_unix */
2821
2822 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2823 {
2824   struct addrinfo ai_hints;
2825   struct addrinfo *ai_res;
2826   struct addrinfo *ai_ptr;
2827   char addr_copy[NI_MAXHOST];
2828   char *addr;
2829   char *port;
2830   int status;
2831
2832   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2833   addr_copy[sizeof (addr_copy) - 1] = 0;
2834   addr = addr_copy;
2835
2836   memset (&ai_hints, 0, sizeof (ai_hints));
2837   ai_hints.ai_flags = 0;
2838 #ifdef AI_ADDRCONFIG
2839   ai_hints.ai_flags |= AI_ADDRCONFIG;
2840 #endif
2841   ai_hints.ai_family = AF_UNSPEC;
2842   ai_hints.ai_socktype = SOCK_STREAM;
2843
2844   port = NULL;
2845   if (*addr == '[') /* IPv6+port format */
2846   {
2847     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2848     addr++;
2849
2850     port = strchr (addr, ']');
2851     if (port == NULL)
2852     {
2853       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2854       return (-1);
2855     }
2856     *port = 0;
2857     port++;
2858
2859     if (*port == ':')
2860       port++;
2861     else if (*port == 0)
2862       port = NULL;
2863     else
2864     {
2865       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2866       return (-1);
2867     }
2868   } /* if (*addr == '[') */
2869   else
2870   {
2871     port = rindex(addr, ':');
2872     if (port != NULL)
2873     {
2874       *port = 0;
2875       port++;
2876     }
2877   }
2878   ai_res = NULL;
2879   status = getaddrinfo (addr,
2880                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2881                         &ai_hints, &ai_res);
2882   if (status != 0)
2883   {
2884     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2885              addr, gai_strerror (status));
2886     return (-1);
2887   }
2888
2889   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2890   {
2891     int fd;
2892     listen_socket_t *temp;
2893     int one = 1;
2894
2895     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2896         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2897     if (temp == NULL)
2898     {
2899       fprintf (stderr,
2900                "rrdcached: open_listen_socket_network: realloc failed.\n");
2901       continue;
2902     }
2903     listen_fds = temp;
2904     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2905
2906     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2907     if (fd < 0)
2908     {
2909       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2910                rrd_strerror(errno));
2911       continue;
2912     }
2913
2914     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2915
2916     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2917     if (status != 0)
2918     {
2919       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2920                sock->addr, rrd_strerror(errno));
2921       close (fd);
2922       continue;
2923     }
2924
2925     status = listen (fd, /* backlog = */ 10);
2926     if (status != 0)
2927     {
2928       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2929                sock->addr, rrd_strerror(errno));
2930       close (fd);
2931       freeaddrinfo(ai_res);
2932       return (-1);
2933     }
2934
2935     listen_fds[listen_fds_num].fd = fd;
2936     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2937     listen_fds_num++;
2938   } /* for (ai_ptr) */
2939
2940   freeaddrinfo(ai_res);
2941   return (0);
2942 } /* }}} static int open_listen_socket_network */
2943
2944 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2945 {
2946   assert(sock != NULL);
2947   assert(sock->addr != NULL);
2948
2949   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2950       || sock->addr[0] == '/')
2951     return (open_listen_socket_unix(sock));
2952   else
2953     return (open_listen_socket_network(sock));
2954 } /* }}} int open_listen_socket */
2955
2956 static int close_listen_sockets (void) /* {{{ */
2957 {
2958   size_t i;
2959
2960   for (i = 0; i < listen_fds_num; i++)
2961   {
2962     close (listen_fds[i].fd);
2963
2964     if (listen_fds[i].family == PF_UNIX)
2965       unlink(listen_fds[i].addr);
2966   }
2967
2968   free (listen_fds);
2969   listen_fds = NULL;
2970   listen_fds_num = 0;
2971
2972   return (0);
2973 } /* }}} int close_listen_sockets */
2974
2975 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2976 {
2977   struct pollfd *pollfds;
2978   int pollfds_num;
2979   int status;
2980   int i;
2981
2982   if (listen_fds_num < 1)
2983   {
2984     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2985     return (NULL);
2986   }
2987
2988   pollfds_num = listen_fds_num;
2989   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2990   if (pollfds == NULL)
2991   {
2992     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2993     return (NULL);
2994   }
2995   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2996
2997   RRDD_LOG(LOG_INFO, "listening for connections");
2998
2999   while (state == RUNNING)
3000   {
3001     for (i = 0; i < pollfds_num; i++)
3002     {
3003       pollfds[i].fd = listen_fds[i].fd;
3004       pollfds[i].events = POLLIN | POLLPRI;
3005       pollfds[i].revents = 0;
3006     }
3007
3008     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3009     if (state != RUNNING)
3010       break;
3011     else if (status == 0) /* timeout */
3012       continue;
3013     else if (status < 0) /* error */
3014     {
3015       status = errno;
3016       if (status != EINTR)
3017       {
3018         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3019       }
3020       continue;
3021     }
3022
3023     for (i = 0; i < pollfds_num; i++)
3024     {
3025       listen_socket_t *client_sock;
3026       struct sockaddr_storage client_sa;
3027       socklen_t client_sa_size;
3028       pthread_t tid;
3029       pthread_attr_t attr;
3030
3031       if (pollfds[i].revents == 0)
3032         continue;
3033
3034       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3035       {
3036         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3037             "poll(2) returned something unexpected for listen FD #%i.",
3038             pollfds[i].fd);
3039         continue;
3040       }
3041
3042       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3043       if (client_sock == NULL)
3044       {
3045         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3046         continue;
3047       }
3048       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3049
3050       client_sa_size = sizeof (client_sa);
3051       client_sock->fd = accept (pollfds[i].fd,
3052           (struct sockaddr *) &client_sa, &client_sa_size);
3053       if (client_sock->fd < 0)
3054       {
3055         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3056         free(client_sock);
3057         continue;
3058       }
3059
3060       pthread_attr_init (&attr);
3061       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3062
3063       status = pthread_create (&tid, &attr, connection_thread_main,
3064                                client_sock);
3065       if (status != 0)
3066       {
3067         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3068         close_connection(client_sock);
3069         continue;
3070       }
3071     } /* for (pollfds_num) */
3072   } /* while (state == RUNNING) */
3073
3074   RRDD_LOG(LOG_INFO, "starting shutdown");
3075
3076   close_listen_sockets ();
3077
3078   pthread_mutex_lock (&connection_threads_lock);
3079   while (connection_threads_num > 0)
3080     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3081   pthread_mutex_unlock (&connection_threads_lock);
3082
3083   free(pollfds);
3084
3085   return (NULL);
3086 } /* }}} void *listen_thread_main */
3087
3088 static int daemonize (void) /* {{{ */
3089 {
3090   int pid_fd;
3091   char *base_dir;
3092
3093   daemon_uid = geteuid();
3094
3095   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3096   if (pid_fd < 0)
3097     pid_fd = check_pidfile();
3098   if (pid_fd < 0)
3099     return pid_fd;
3100
3101   /* open all the listen sockets */
3102   if (config_listen_address_list_len > 0)
3103   {
3104     for (size_t i = 0; i < config_listen_address_list_len; i++)
3105       open_listen_socket (config_listen_address_list[i]);
3106
3107     rrd_free_ptrs((void ***) &config_listen_address_list,
3108                   &config_listen_address_list_len);
3109   }
3110   else
3111   {
3112     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3113         sizeof(default_socket.addr) - 1);
3114     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3115     open_listen_socket (&default_socket);
3116   }
3117
3118   if (listen_fds_num < 1)
3119   {
3120     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3121     goto error;
3122   }
3123
3124   if (!stay_foreground)
3125   {
3126     pid_t child;
3127
3128     child = fork ();
3129     if (child < 0)
3130     {
3131       fprintf (stderr, "daemonize: fork(2) failed.\n");
3132       goto error;
3133     }
3134     else if (child > 0)
3135       exit(0);
3136
3137     /* Become session leader */
3138     setsid ();
3139
3140     /* Open the first three file descriptors to /dev/null */
3141     close (2);
3142     close (1);
3143     close (0);
3144
3145     open ("/dev/null", O_RDWR);
3146     if (dup(0) == -1 || dup(0) == -1){
3147         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3148     }
3149   } /* if (!stay_foreground) */
3150
3151   /* Change into the /tmp directory. */
3152   base_dir = (config_base_dir != NULL)
3153     ? config_base_dir
3154     : "/tmp";
3155
3156   if (chdir (base_dir) != 0)
3157   {
3158     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3159     goto error;
3160   }
3161
3162   install_signal_handlers();
3163
3164   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3165   RRDD_LOG(LOG_INFO, "starting up");
3166
3167   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3168                                 (GDestroyNotify) free_cache_item);
3169   if (cache_tree == NULL)
3170   {
3171     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3172     goto error;
3173   }
3174
3175   return write_pidfile (pid_fd);
3176
3177 error:
3178   remove_pidfile();
3179   return -1;
3180 } /* }}} int daemonize */
3181
3182 static int cleanup (void) /* {{{ */
3183 {
3184   pthread_cond_broadcast (&flush_cond);
3185   pthread_join (flush_thread, NULL);
3186
3187   pthread_cond_broadcast (&queue_cond);
3188   for (int i = 0; i < config_queue_threads; i++)
3189     pthread_join (queue_threads[i], NULL);
3190
3191   if (config_flush_at_shutdown)
3192   {
3193     assert(cache_queue_head == NULL);
3194     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3195   }
3196
3197   free(queue_threads);
3198   free(config_base_dir);
3199
3200   pthread_mutex_lock(&cache_lock);
3201   g_tree_destroy(cache_tree);
3202
3203   pthread_mutex_lock(&journal_lock);
3204   journal_done();
3205
3206   RRDD_LOG(LOG_INFO, "goodbye");
3207   closelog ();
3208
3209   remove_pidfile ();
3210   free(config_pid_file);
3211
3212   return (0);
3213 } /* }}} int cleanup */
3214
3215 static int read_options (int argc, char **argv) /* {{{ */
3216 {
3217   int option;
3218   int status = 0;
3219
3220   socket_permission_clear (&default_socket);
3221
3222   default_socket.socket_group = (gid_t)-1;
3223   default_socket.socket_permissions = (mode_t)-1;
3224
3225   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3226   {
3227     switch (option)
3228     {
3229       case 'O':
3230         opt_no_overwrite = 1;
3231         break;
3232
3233       case 'g':
3234         stay_foreground=1;
3235         break;
3236
3237       case 'l':
3238       {
3239         listen_socket_t *new;
3240
3241         new = malloc(sizeof(listen_socket_t));
3242         if (new == NULL)
3243         {
3244           fprintf(stderr, "read_options: malloc failed.\n");
3245           return(2);
3246         }
3247         memset(new, 0, sizeof(listen_socket_t));
3248
3249         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3250
3251         /* Add permissions to the socket {{{ */
3252         if (default_socket.permissions != 0)
3253         {
3254           socket_permission_copy (new, &default_socket);
3255         }
3256         else /* if (default_socket.permissions == 0) */
3257         {
3258           /* Add permission for ALL commands to the socket. */
3259           size_t i;
3260           for (i = 0; i < list_of_commands_len; i++)
3261           {
3262             status = socket_permission_add (new, list_of_commands[i].cmd);
3263             if (status != 0)
3264             {
3265               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3266                   "socket failed. This should never happen, ever! Sorry.\n",
3267                   list_of_commands[i].cmd);
3268               status = 4;
3269             }
3270           }
3271         }
3272         /* }}} Done adding permissions. */
3273
3274         new->socket_group = default_socket.socket_group;
3275         new->socket_permissions = default_socket.socket_permissions;
3276
3277         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3278                          &config_listen_address_list_len, new))
3279         {
3280           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3281           return (2);
3282         }
3283       }
3284       break;
3285
3286       /* set socket group permissions */
3287       case 's':
3288       {
3289         gid_t group_gid;
3290         struct group *grp;
3291
3292         group_gid = strtoul(optarg, NULL, 10);
3293         if (errno != EINVAL && group_gid>0)
3294         {
3295           /* we were passed a number */
3296           grp = getgrgid(group_gid);
3297         }
3298         else
3299         {
3300           grp = getgrnam(optarg);
3301         }
3302
3303         if (grp)
3304         {
3305           default_socket.socket_group = grp->gr_gid;
3306         }
3307         else
3308         {
3309           /* no idea what the user wanted... */
3310           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3311           return (5);
3312         }
3313       }
3314       break;
3315
3316       /* set socket file permissions */
3317       case 'm':
3318       {
3319         long  tmp;
3320         char *endptr = NULL;
3321
3322         tmp = strtol (optarg, &endptr, 8);
3323         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3324             || (tmp > 07777) || (tmp < 0)) {
3325           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3326               optarg);
3327           return (5);
3328         }
3329
3330         default_socket.socket_permissions = (mode_t)tmp;
3331       }
3332       break;
3333
3334       case 'P':
3335       {
3336         char *optcopy;
3337         char *saveptr;
3338         char *dummy;
3339         char *ptr;
3340
3341         socket_permission_clear (&default_socket);
3342
3343         optcopy = strdup (optarg);
3344         dummy = optcopy;
3345         saveptr = NULL;
3346         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3347         {
3348           dummy = NULL;
3349           status = socket_permission_add (&default_socket, ptr);
3350           if (status != 0)
3351           {
3352             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3353                 "socket failed. Most likely, this permission doesn't "
3354                 "exist. Check your command line.\n", ptr);
3355             status = 4;
3356           }
3357         }
3358
3359         free (optcopy);
3360       }
3361       break;
3362
3363       case 'f':
3364       {
3365         int temp;
3366
3367         temp = atoi (optarg);
3368         if (temp > 0)
3369           config_flush_interval = temp;
3370         else
3371         {
3372           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3373           status = 3;
3374         }
3375       }
3376       break;
3377
3378       case 'w':
3379       {
3380         int temp;
3381
3382         temp = atoi (optarg);
3383         if (temp > 0)
3384           config_write_interval = temp;
3385         else
3386         {
3387           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3388           status = 2;
3389         }
3390       }
3391       break;
3392
3393       case 'z':
3394       {
3395         int temp;
3396
3397         temp = atoi(optarg);
3398         if (temp > 0)
3399           config_write_jitter = temp;
3400         else
3401         {
3402           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3403           status = 2;
3404         }
3405
3406         break;
3407       }
3408
3409       case 't':
3410       {
3411         int threads;
3412         threads = atoi(optarg);
3413         if (threads >= 1)
3414           config_queue_threads = threads;
3415         else
3416         {
3417           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3418           return 1;
3419         }
3420       }
3421       break;
3422
3423       case 'B':
3424         config_write_base_only = 1;
3425         break;
3426
3427       case 'b':
3428       {
3429         size_t len;
3430         char base_realpath[PATH_MAX];
3431
3432         if (config_base_dir != NULL)
3433           free (config_base_dir);
3434         config_base_dir = strdup (optarg);
3435         if (config_base_dir == NULL)
3436         {
3437           fprintf (stderr, "read_options: strdup failed.\n");
3438           return (3);
3439         }
3440
3441         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3442         {
3443           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3444               config_base_dir, rrd_strerror (errno));
3445           return (3);
3446         }
3447
3448         /* make sure that the base directory is not resolved via
3449          * symbolic links.  this makes some performance-enhancing
3450          * assumptions possible (we don't have to resolve paths
3451          * that start with a "/")
3452          */
3453         if (realpath(config_base_dir, base_realpath) == NULL)
3454         {
3455           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3456               "%s\n", config_base_dir, rrd_strerror(errno));
3457           return 5;
3458         }
3459
3460         len = strlen (config_base_dir);
3461         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3462         {
3463           config_base_dir[len - 1] = 0;
3464           len--;
3465         }
3466
3467         if (len < 1)
3468         {
3469           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3470           return (4);
3471         }
3472
3473         _config_base_dir_len = len;
3474
3475         len = strlen (base_realpath);
3476         while ((len > 0) && (base_realpath[len - 1] == '/'))
3477         {
3478           base_realpath[len - 1] = '\0';
3479           len--;
3480         }
3481
3482         if (strncmp(config_base_dir,
3483                          base_realpath, sizeof(base_realpath)) != 0)
3484         {
3485           fprintf(stderr,
3486                   "Base directory (-b) resolved via file system links!\n"
3487                   "Please consult rrdcached '-b' documentation!\n"
3488                   "Consider specifying the real directory (%s)\n",
3489                   base_realpath);
3490           return 5;
3491         }
3492       }
3493       break;
3494
3495       case 'p':
3496       {
3497         if (config_pid_file != NULL)
3498           free (config_pid_file);
3499         config_pid_file = strdup (optarg);
3500         if (config_pid_file == NULL)
3501         {
3502           fprintf (stderr, "read_options: strdup failed.\n");
3503           return (3);
3504         }
3505       }
3506       break;
3507
3508       case 'F':
3509         config_flush_at_shutdown = 1;
3510         break;
3511
3512       case 'j':
3513       {
3514         char journal_dir_actual[PATH_MAX];
3515         const char *dir;
3516         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3517
3518         status = rrd_mkdir_p(dir, 0777);
3519         if (status != 0)
3520         {
3521           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3522               dir, rrd_strerror(errno));
3523           return 6;
3524         }
3525
3526         if (access(dir, R_OK|W_OK|X_OK) != 0)
3527         {
3528           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3529                   errno ? rrd_strerror(errno) : "");
3530           return 6;
3531         }
3532       }
3533       break;
3534
3535       case 'a':
3536       {
3537         int temp = atoi(optarg);
3538         if (temp > 0)
3539           config_alloc_chunk = temp;
3540         else
3541         {
3542           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3543           return 10;
3544         }
3545       }
3546       break;
3547
3548       case 'h':
3549       case '?':
3550         printf ("RRDCacheD %s\n"
3551             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3552             "\n"
3553             "Usage: rrdcached [options]\n"
3554             "\n"
3555             "Valid options are:\n"
3556             "  -l <address>  Socket address to listen to.\n"
3557             "  -P <perms>    Sets the permissions to assign to all following "
3558                             "sockets\n"
3559             "  -w <seconds>  Interval in which to write data.\n"
3560             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3561             "  -t <threads>  Number of write threads.\n"
3562             "  -f <seconds>  Interval in which to flush dead data.\n"
3563             "  -p <file>     Location of the PID-file.\n"
3564             "  -b <dir>      Base directory to change to.\n"
3565             "  -B            Restrict file access to paths within -b <dir>\n"
3566             "  -g            Do not fork and run in the foreground.\n"
3567             "  -j <dir>      Directory in which to create the journal files.\n"
3568             "  -F            Always flush all updates at shutdown\n"
3569             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3570             "                (the socket will also have read/write permissions "
3571                             "for that group)\n"
3572             "  -m <mode>     File permissions (octal) of all following UNIX "
3573                             "sockets\n"
3574             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3575             "  -O            Do not allow CREATE commands to overwrite existing\n"
3576             "                files, even if asked to.\n"
3577             "\n"
3578             "For more information and a detailed description of all options "
3579             "please refer\n"
3580             "to the rrdcached(1) manual page.\n",
3581             VERSION);
3582         if (option == 'h')
3583           status = -1;
3584         else
3585           status = 1;
3586         break;
3587     } /* switch (option) */
3588   } /* while (getopt) */
3589
3590   /* advise the user when values are not sane */
3591   if (config_flush_interval < 2 * config_write_interval)
3592     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3593             " 2x write interval (-w) !\n");
3594   if (config_write_jitter > config_write_interval)
3595     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3596             " write interval (-w) !\n");
3597
3598   if (config_write_base_only && config_base_dir == NULL)
3599     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3600             "  Consult the rrdcached documentation\n");
3601
3602   if (journal_dir == NULL)
3603     config_flush_at_shutdown = 1;
3604
3605   return (status);
3606 } /* }}} int read_options */
3607
3608 int main (int argc, char **argv)
3609 {
3610   int status;
3611
3612   status = read_options (argc, argv);
3613   if (status != 0)
3614   {
3615     if (status < 0)
3616       status = 0;
3617     return (status);
3618   }
3619
3620   status = daemonize ();
3621   if (status != 0)
3622   {
3623     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3624     return (1);
3625   }
3626
3627   journal_init();
3628
3629   /* start the queue threads */
3630   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3631   if (queue_threads == NULL)
3632   {
3633     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3634     cleanup();
3635     return (1);
3636   }
3637   for (int i = 0; i < config_queue_threads; i++)
3638   {
3639     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3640     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3641     if (status != 0)
3642     {
3643       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3644       cleanup();
3645       return (1);
3646     }
3647   }
3648
3649   /* start the flush thread */
3650   memset(&flush_thread, 0, sizeof(flush_thread));
3651   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3652   if (status != 0)
3653   {
3654     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3655     cleanup();
3656     return (1);
3657   }
3658
3659   listen_thread_main (NULL);
3660   cleanup ();
3661
3662   return (0);
3663 } /* int main */
3664
3665 /*
3666  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3667  */