* introduce a new rrd_create_r2 call to fix the no-overwrite api mess
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
122
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127
128 struct listen_socket_s
129 {
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
133
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
137
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
142
143   char *wbuf;
144   ssize_t wbuf_len;
145
146   uint32_t permissions;
147
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
152
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
160
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
163
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
167
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
173
174   char *syntax;
175   char *help;
176 };
177
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182   char *file;
183   char **values;
184   size_t values_num;            /* number of valid pointers */
185   size_t values_alloc;          /* number of allocated pointers */
186   time_t last_flush_time;
187   time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE  (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190   int flags;
191   pthread_cond_t  flushed;
192   cache_item_t *prev;
193   cache_item_t *next;
194 };
195
196 struct callback_flush_data_s
197 {
198   time_t now;
199   time_t abs_timeout;
200   char **keys;
201   size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
204
205 enum queue_side_e
206 {
207   HEAD,
208   TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
211
212 /* describe a set of journal files */
213 typedef struct {
214   char **files;
215   size_t files_num;
216 } journal_set;
217
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
221
222 /*
223  * Variables
224  */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
227
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
230
231 static listen_socket_t default_socket;
232
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
238
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
242
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
249
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
265
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
268
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
277
278 static int opt_no_overwrite = 0; /* default for the daemon */
279
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL;         /* current journal file handle */
287 static long  journal_size = 0;          /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
293
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
296
297 /* 
298  * Functions
299  */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303   state = FLUSHING;
304   pthread_cond_broadcast(&flush_cond);
305   pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
307
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310   sig_common("INT");
311 } /* }}} void sig_int_handler */
312
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315   sig_common("TERM");
316 } /* }}} void sig_term_handler */
317
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320   config_flush_at_shutdown = 1;
321   sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
323
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326   config_flush_at_shutdown = 0;
327   sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
329
330 static void install_signal_handlers(void) /* {{{ */
331 {
332   /* These structures are static, because `sigaction' behaves weird if the are
333    * overwritten.. */
334   static struct sigaction sa_int;
335   static struct sigaction sa_term;
336   static struct sigaction sa_pipe;
337   static struct sigaction sa_usr1;
338   static struct sigaction sa_usr2;
339
340   /* Install signal handlers */
341   memset (&sa_int, 0, sizeof (sa_int));
342   sa_int.sa_handler = sig_int_handler;
343   sigaction (SIGINT, &sa_int, NULL);
344
345   memset (&sa_term, 0, sizeof (sa_term));
346   sa_term.sa_handler = sig_term_handler;
347   sigaction (SIGTERM, &sa_term, NULL);
348
349   memset (&sa_pipe, 0, sizeof (sa_pipe));
350   sa_pipe.sa_handler = SIG_IGN;
351   sigaction (SIGPIPE, &sa_pipe, NULL);
352
353   memset (&sa_pipe, 0, sizeof (sa_usr1));
354   sa_usr1.sa_handler = sig_usr1_handler;
355   sigaction (SIGUSR1, &sa_usr1, NULL);
356
357   memset (&sa_usr2, 0, sizeof (sa_usr2));
358   sa_usr2.sa_handler = sig_usr2_handler;
359   sigaction (SIGUSR2, &sa_usr2, NULL);
360
361 } /* }}} void install_signal_handlers */
362
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365   int fd;
366   const char *file;
367   char *file_copy, *dir;
368
369   file = (config_pid_file != NULL)
370     ? config_pid_file
371     : LOCALSTATEDIR "/run/rrdcached.pid";
372
373   /* dirname may modify its argument */
374   file_copy = strdup(file);
375   if (file_copy == NULL)
376   {
377     fprintf(stderr, "rrdcached: strdup(): %s\n",
378         rrd_strerror(errno));
379     return -1;
380   }
381
382   dir = dirname(file_copy);
383   if (rrd_mkdir_p(dir, 0777) != 0)
384   {
385     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386         dir, rrd_strerror(errno));
387     return -1;
388   }
389
390   free(file_copy);
391
392   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393   if (fd < 0)
394     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395             action, file, rrd_strerror(errno));
396
397   return(fd);
398 } /* }}} static int open_pidfile */
399
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403   int pid_fd;
404   pid_t pid;
405   char pid_str[16];
406
407   pid_fd = open_pidfile("open", O_RDWR);
408   if (pid_fd < 0)
409     return pid_fd;
410
411   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412     return -1;
413
414   pid = atoi(pid_str);
415   if (pid <= 0)
416     return -1;
417
418   /* another running process that we can signal COULD be
419    * a competing rrdcached */
420   if (pid != getpid() && kill(pid, 0) == 0)
421   {
422     fprintf(stderr,
423             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
427
428   lseek(pid_fd, 0, SEEK_SET);
429   if (ftruncate(pid_fd, 0) == -1)
430   {
431     fprintf(stderr,
432             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433     close(pid_fd);
434     return -1;
435   }
436
437   fprintf(stderr,
438           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439           "rrdcached: starting normally.\n", pid);
440
441   return pid_fd;
442 } /* }}} static int check_pidfile */
443
444 static int write_pidfile (int fd) /* {{{ */
445 {
446   pid_t pid;
447   FILE *fh;
448
449   pid = getpid ();
450
451   fh = fdopen (fd, "w");
452   if (fh == NULL)
453   {
454     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455     close(fd);
456     return (-1);
457   }
458
459   fprintf (fh, "%i\n", (int) pid);
460   fclose (fh);
461
462   return (0);
463 } /* }}} int write_pidfile */
464
465 static int remove_pidfile (void) /* {{{ */
466 {
467   char *file;
468   int status;
469
470   file = (config_pid_file != NULL)
471     ? config_pid_file
472     : LOCALSTATEDIR "/run/rrdcached.pid";
473
474   status = unlink (file);
475   if (status == 0)
476     return (0);
477   return (errno);
478 } /* }}} int remove_pidfile */
479
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482   char *eol;
483
484   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485                sock->next_read - sock->next_cmd);
486
487   if (eol == NULL)
488   {
489     /* no commands left, move remainder back to front of rbuf */
490     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491             sock->next_read - sock->next_cmd);
492     sock->next_read -= sock->next_cmd;
493     sock->next_cmd = 0;
494     *len = 0;
495     return NULL;
496   }
497   else
498   {
499     char *cmd = sock->rbuf + sock->next_cmd;
500     *eol = '\0';
501
502     sock->next_cmd = eol - sock->rbuf + 1;
503
504     if (eol > sock->rbuf && *(eol-1) == '\r')
505       *(--eol) = '\0'; /* handle "\r\n" EOL */
506
507     *len = eol - cmd;
508
509     return cmd;
510   }
511
512   /* NOTREACHED */
513   assert(1==0);
514 } /* }}} char *next_cmd */
515
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519   char *new_buf;
520
521   assert(sock != NULL);
522
523   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524   if (new_buf == NULL)
525   {
526     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527     return -1;
528   }
529
530   strncpy(new_buf + sock->wbuf_len, str, len + 1);
531
532   sock->wbuf = new_buf;
533   sock->wbuf_len += len;
534
535   return 0;
536 } /* }}} static int add_to_wbuf */
537
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541   va_list argp;
542   char buffer[CMD_MAX];
543   int len;
544
545   if (JOURNAL_REPLAY(sock)) return 0;
546   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
547
548   va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552   len = vsprintf(buffer, fmt, argp);
553 #endif
554   va_end(argp);
555   if (len < 0)
556   {
557     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558     return -1;
559   }
560
561   return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
563
564 static int count_lines(char *str) /* {{{ */
565 {
566   int lines = 0;
567
568   if (str != NULL)
569   {
570     while ((str = strchr(str, '\n')) != NULL)
571     {
572       ++lines;
573       ++str;
574     }
575   }
576
577   return lines;
578 } /* }}} static int count_lines */
579
580 /* send the response back to the user.
581  * returns 0 on success, -1 on error
582  * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584                           char *fmt, ...) /* {{{ */
585 {
586   va_list argp;
587   char buffer[CMD_MAX];
588   int lines;
589   ssize_t wrote;
590   int rclen, len;
591
592   if (JOURNAL_REPLAY(sock)) return rc;
593
594   if (sock->batch_start)
595   {
596     if (rc == RESP_OK)
597       return rc; /* no response on success during BATCH */
598     lines = sock->batch_cmd;
599   }
600   else if (rc == RESP_OK)
601     lines = count_lines(sock->wbuf);
602   else
603     lines = -1;
604
605   rclen = sprintf(buffer, "%d ", lines);
606   va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610   len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612   va_end(argp);
613   if (len < 0)
614     return -1;
615
616   len += rclen;
617
618   /* append the result to the wbuf, don't write to the user */
619   if (sock->batch_start)
620     return add_to_wbuf(sock, buffer, len);
621
622   /* first write must be complete */
623   if (len != write(sock->fd, buffer, len))
624   {
625     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626     return -1;
627   }
628
629   if (sock->wbuf != NULL && rc == RESP_OK)
630   {
631     wrote = 0;
632     while (wrote < sock->wbuf_len)
633     {
634       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635       if (wb <= 0)
636       {
637         RRDD_LOG(LOG_INFO, "send_response: could not write results");
638         return -1;
639       }
640       wrote += wb;
641     }
642   }
643
644   free(sock->wbuf); sock->wbuf = NULL;
645   sock->wbuf_len = 0;
646
647   return 0;
648 } /* }}} */
649
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652   ci->values = NULL;
653   ci->values_num = 0;
654   ci->values_alloc = 0;
655
656   ci->last_flush_time = when;
657   if (config_write_jitter > 0)
658     ci->last_flush_time += (rrd_random() % config_write_jitter);
659 }
660
661 /* remove_from_queue
662  * remove a "cache_item_t" item from the queue.
663  * must hold 'cache_lock' when calling this
664  */
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666 {
667   if (ci == NULL) return;
668   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669
670   if (ci->prev == NULL)
671     cache_queue_head = ci->next; /* reset head */
672   else
673     ci->prev->next = ci->next;
674
675   if (ci->next == NULL)
676     cache_queue_tail = ci->prev; /* reset the tail */
677   else
678     ci->next->prev = ci->prev;
679
680   ci->next = ci->prev = NULL;
681   ci->flags &= ~CI_FLAGS_IN_QUEUE;
682
683   pthread_mutex_lock (&stats_lock);
684   assert (stats_queue_length > 0);
685   stats_queue_length--;
686   pthread_mutex_unlock (&stats_lock);
687
688 } /* }}} static void remove_from_queue */
689
690 /* free the resources associated with the cache_item_t
691  * must hold cache_lock when calling this function
692  */
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694 {
695   if (ci == NULL) return NULL;
696
697   remove_from_queue(ci);
698
699   for (size_t i=0; i < ci->values_num; i++)
700     free(ci->values[i]);
701
702   free (ci->values);
703   free (ci->file);
704
705   /* in case anyone is waiting */
706   pthread_cond_broadcast(&ci->flushed);
707   pthread_cond_destroy(&ci->flushed);
708
709   free (ci);
710
711   return NULL;
712 } /* }}} static void *free_cache_item */
713
714 /*
715  * enqueue_cache_item:
716  * `cache_lock' must be acquired before calling this function!
717  */
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
719     queue_side_t side)
720 {
721   if (ci == NULL)
722     return (-1);
723
724   if (ci->values_num == 0)
725     return (0);
726
727   if (side == HEAD)
728   {
729     if (cache_queue_head == ci)
730       return 0;
731
732     /* remove if further down in queue */
733     remove_from_queue(ci);
734
735     ci->prev = NULL;
736     ci->next = cache_queue_head;
737     if (ci->next != NULL)
738       ci->next->prev = ci;
739     cache_queue_head = ci;
740
741     if (cache_queue_tail == NULL)
742       cache_queue_tail = cache_queue_head;
743   }
744   else /* (side == TAIL) */
745   {
746     /* We don't move values back in the list.. */
747     if (ci->flags & CI_FLAGS_IN_QUEUE)
748       return (0);
749
750     assert (ci->next == NULL);
751     assert (ci->prev == NULL);
752
753     ci->prev = cache_queue_tail;
754
755     if (cache_queue_tail == NULL)
756       cache_queue_head = ci;
757     else
758       cache_queue_tail->next = ci;
759
760     cache_queue_tail = ci;
761   }
762
763   ci->flags |= CI_FLAGS_IN_QUEUE;
764
765   pthread_cond_signal(&queue_cond);
766   pthread_mutex_lock (&stats_lock);
767   stats_queue_length++;
768   pthread_mutex_unlock (&stats_lock);
769
770   return (0);
771 } /* }}} int enqueue_cache_item */
772
773 /*
774  * tree_callback_flush:
775  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776  * while this is in progress.
777  */
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
779     gpointer data)
780 {
781   cache_item_t *ci;
782   callback_flush_data_t *cfd;
783
784   ci = (cache_item_t *) value;
785   cfd = (callback_flush_data_t *) data;
786
787   if (ci->flags & CI_FLAGS_IN_QUEUE)
788     return FALSE;
789
790   if (ci->values_num > 0
791       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
792   {
793     enqueue_cache_item (ci, TAIL);
794   }
795   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796       && (ci->values_num <= 0))
797   {
798     assert ((char *) key == ci->file);
799     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
800     {
801       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
802       return (FALSE);
803     }
804   }
805
806   return (FALSE);
807 } /* }}} gboolean tree_callback_flush */
808
809 static int flush_old_values (int max_age)
810 {
811   callback_flush_data_t cfd;
812   size_t k;
813
814   memset (&cfd, 0, sizeof (cfd));
815   /* Pass the current time as user data so that we don't need to call
816    * `time' for each node. */
817   cfd.now = time (NULL);
818   cfd.keys = NULL;
819   cfd.keys_num = 0;
820
821   if (max_age > 0)
822     cfd.abs_timeout = cfd.now - max_age;
823   else
824     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825
826   /* `tree_callback_flush' will return the keys of all values that haven't
827    * been touched in the last `config_flush_interval' seconds in `cfd'.
828    * The char*'s in this array point to the same memory as ci->file, so we
829    * don't need to free them separately. */
830   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831
832   for (k = 0; k < cfd.keys_num; k++)
833   {
834     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835     /* should never fail, since we have held the cache_lock
836      * the entire time */
837     assert(status == TRUE);
838   }
839
840   if (cfd.keys != NULL)
841   {
842     free (cfd.keys);
843     cfd.keys = NULL;
844   }
845
846   return (0);
847 } /* int flush_old_values */
848
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850 {
851   struct timeval now;
852   struct timespec next_flush;
853   int status;
854
855   gettimeofday (&now, NULL);
856   next_flush.tv_sec = now.tv_sec + config_flush_interval;
857   next_flush.tv_nsec = 1000 * now.tv_usec;
858
859   pthread_mutex_lock(&cache_lock);
860
861   while (state == RUNNING)
862   {
863     gettimeofday (&now, NULL);
864     if ((now.tv_sec > next_flush.tv_sec)
865         || ((now.tv_sec == next_flush.tv_sec)
866           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
867     {
868       RRDD_LOG(LOG_DEBUG, "flushing old values");
869
870       /* Determine the time of the next cache flush. */
871       next_flush.tv_sec = now.tv_sec + config_flush_interval;
872
873       /* Flush all values that haven't been written in the last
874        * `config_write_interval' seconds. */
875       flush_old_values (config_write_interval);
876
877       /* unlock the cache while we rotate so we don't block incoming
878        * updates if the fsync() blocks on disk I/O */
879       pthread_mutex_unlock(&cache_lock);
880       journal_rotate();
881       pthread_mutex_lock(&cache_lock);
882     }
883
884     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885     if (status != 0 && status != ETIMEDOUT)
886     {
887       RRDD_LOG (LOG_ERR, "flush_thread_main: "
888                 "pthread_cond_timedwait returned %i.", status);
889     }
890   }
891
892   if (config_flush_at_shutdown)
893     flush_old_values (-1); /* flush everything */
894
895   state = SHUTDOWN;
896
897   pthread_mutex_unlock(&cache_lock);
898
899   return NULL;
900 } /* void *flush_thread_main */
901
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903 {
904   pthread_mutex_lock (&cache_lock);
905
906   while (state != SHUTDOWN
907          || (cache_queue_head != NULL && config_flush_at_shutdown))
908   {
909     cache_item_t *ci;
910     char *file;
911     char **values;
912     size_t values_num;
913     int status;
914
915     /* Now, check if there's something to store away. If not, wait until
916      * something comes in. */
917     if (cache_queue_head == NULL)
918     {
919       status = pthread_cond_wait (&queue_cond, &cache_lock);
920       if ((status != 0) && (status != ETIMEDOUT))
921       {
922         RRDD_LOG (LOG_ERR, "queue_thread_main: "
923             "pthread_cond_wait returned %i.", status);
924       }
925     }
926
927     /* Check if a value has arrived. This may be NULL if we timed out or there
928      * was an interrupt such as a signal. */
929     if (cache_queue_head == NULL)
930       continue;
931
932     ci = cache_queue_head;
933
934     /* copy the relevant parts */
935     file = strdup (ci->file);
936     if (file == NULL)
937     {
938       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
939       continue;
940     }
941
942     assert(ci->values != NULL);
943     assert(ci->values_num > 0);
944
945     values = ci->values;
946     values_num = ci->values_num;
947
948     wipe_ci_values(ci, time(NULL));
949     remove_from_queue(ci);
950
951     pthread_mutex_unlock (&cache_lock);
952
953     rrd_clear_error ();
954     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
955     if (status != 0)
956     {
957       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958           "rrd_update_r (%s) failed with status %i. (%s)",
959           file, status, rrd_get_error());
960     }
961
962     journal_write("wrote", file);
963
964     /* Search again in the tree.  It's possible someone issued a "FORGET"
965      * while we were writing the update values. */
966     pthread_mutex_lock(&cache_lock);
967     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
968     if (ci)
969       pthread_cond_broadcast(&ci->flushed);
970     pthread_mutex_unlock(&cache_lock);
971
972     if (status == 0)
973     {
974       pthread_mutex_lock (&stats_lock);
975       stats_updates_written++;
976       stats_data_sets_written += values_num;
977       pthread_mutex_unlock (&stats_lock);
978     }
979
980     rrd_free_ptrs((void ***) &values, &values_num);
981     free(file);
982
983     pthread_mutex_lock (&cache_lock);
984   }
985   pthread_mutex_unlock (&cache_lock);
986
987   return (NULL);
988 } /* }}} void *queue_thread_main */
989
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991     size_t *buffer_size_ret, char **field_ret)
992 {
993   char *buffer;
994   size_t buffer_pos;
995   size_t buffer_size;
996   char *field;
997   size_t field_size;
998   int status;
999
1000   buffer = *buffer_ret;
1001   buffer_pos = 0;
1002   buffer_size = *buffer_size_ret;
1003   field = *buffer_ret;
1004   field_size = 0;
1005
1006   if (buffer_size <= 0)
1007     return (-1);
1008
1009   /* This is ensured by `handle_request'. */
1010   assert (buffer[buffer_size - 1] == '\0');
1011
1012   status = -1;
1013   while (buffer_pos < buffer_size)
1014   {
1015     /* Check for end-of-field or end-of-buffer */
1016     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1017     {
1018       field[field_size] = 0;
1019       field_size++;
1020       buffer_pos++;
1021       status = 0;
1022       break;
1023     }
1024     /* Handle escaped characters. */
1025     else if (buffer[buffer_pos] == '\\')
1026     {
1027       if (buffer_pos >= (buffer_size - 1))
1028         break;
1029       buffer_pos++;
1030       field[field_size] = buffer[buffer_pos];
1031       field_size++;
1032       buffer_pos++;
1033     }
1034     /* Normal operation */ 
1035     else
1036     {
1037       field[field_size] = buffer[buffer_pos];
1038       field_size++;
1039       buffer_pos++;
1040     }
1041   } /* while (buffer_pos < buffer_size) */
1042
1043   if (status != 0)
1044     return (status);
1045
1046   *buffer_ret = buffer + buffer_pos;
1047   *buffer_size_ret = buffer_size - buffer_pos;
1048   *field_ret = field;
1049
1050   return (0);
1051 } /* }}} int buffer_get_field */
1052
1053 /* if we're restricting writes to the base directory,
1054  * check whether the file falls within the dir
1055  * returns 1 if OK, otherwise 0
1056  */
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058 {
1059   assert(file != NULL);
1060
1061   if (!config_write_base_only
1062       || JOURNAL_REPLAY(sock)
1063       || config_base_dir == NULL)
1064     return 1;
1065
1066   if (strstr(file, "../") != NULL) goto err;
1067
1068   /* relative paths without "../" are ok */
1069   if (*file != '/') return 1;
1070
1071   /* file must be of the format base + "/" + <1+ char filename> */
1072   if (strlen(file) < _config_base_dir_len + 2) goto err;
1073   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074   if (*(file + _config_base_dir_len) != '/') goto err;
1075
1076   return 1;
1077
1078 err:
1079   if (sock != NULL && sock->fd >= 0)
1080     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081
1082   return 0;
1083 } /* }}} static int check_file_access */
1084
1085 /* when using a base dir, convert relative paths to absolute paths.
1086  * if necessary, modifies the "filename" pointer to point
1087  * to the new path created in "tmp".  "tmp" is provided
1088  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1089  *
1090  * this allows us to optimize for the expected case (absolute path)
1091  * with a no-op.
1092  */
1093 static void get_abs_path(char **filename, char *tmp)
1094 {
1095   assert(tmp != NULL);
1096   assert(filename != NULL && *filename != NULL);
1097
1098   if (config_base_dir == NULL || **filename == '/')
1099     return;
1100
1101   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1102   *filename = tmp;
1103 } /* }}} static int get_abs_path */
1104
1105 static int flush_file (const char *filename) /* {{{ */
1106 {
1107   cache_item_t *ci;
1108
1109   pthread_mutex_lock (&cache_lock);
1110
1111   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1112   if (ci == NULL)
1113   {
1114     pthread_mutex_unlock (&cache_lock);
1115     return (ENOENT);
1116   }
1117
1118   if (ci->values_num > 0)
1119   {
1120     /* Enqueue at head */
1121     enqueue_cache_item (ci, HEAD);
1122     pthread_cond_wait(&ci->flushed, &cache_lock);
1123   }
1124
1125   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1126    * may have been purged during our cond_wait() */
1127
1128   pthread_mutex_unlock(&cache_lock);
1129
1130   return (0);
1131 } /* }}} int flush_file */
1132
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134 {
1135   char *err = "Syntax error.\n";
1136
1137   if (cmd && cmd->syntax)
1138     err = cmd->syntax;
1139
1140   return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1142
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144 {
1145   uint64_t copy_queue_length;
1146   uint64_t copy_updates_received;
1147   uint64_t copy_flush_received;
1148   uint64_t copy_updates_written;
1149   uint64_t copy_data_sets_written;
1150   uint64_t copy_journal_bytes;
1151   uint64_t copy_journal_rotate;
1152
1153   uint64_t tree_nodes_number;
1154   uint64_t tree_depth;
1155
1156   pthread_mutex_lock (&stats_lock);
1157   copy_queue_length       = stats_queue_length;
1158   copy_updates_received   = stats_updates_received;
1159   copy_flush_received     = stats_flush_received;
1160   copy_updates_written    = stats_updates_written;
1161   copy_data_sets_written  = stats_data_sets_written;
1162   copy_journal_bytes      = stats_journal_bytes;
1163   copy_journal_rotate     = stats_journal_rotate;
1164   pthread_mutex_unlock (&stats_lock);
1165
1166   pthread_mutex_lock (&cache_lock);
1167   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1169   pthread_mutex_unlock (&cache_lock);
1170
1171   add_response_info(sock,
1172                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1173   add_response_info(sock,
1174                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175   add_response_info(sock,
1176                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177   add_response_info(sock,
1178                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179   add_response_info(sock,
1180                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185
1186   send_response(sock, RESP_OK, "Statistics follow\n");
1187
1188   return (0);
1189 } /* }}} int handle_request_stats */
1190
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192 {
1193   char *file, file_tmp[PATH_MAX];
1194   int status;
1195
1196   status = buffer_get_field (&buffer, &buffer_size, &file);
1197   if (status != 0)
1198   {
1199     return syntax_error(sock,cmd);
1200   }
1201   else
1202   {
1203     pthread_mutex_lock(&stats_lock);
1204     stats_flush_received++;
1205     pthread_mutex_unlock(&stats_lock);
1206
1207     get_abs_path(&file, file_tmp);
1208     if (!check_file_access(file, sock)) return 0;
1209
1210     status = flush_file (file);
1211     if (status == 0)
1212       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213     else if (status == ENOENT)
1214     {
1215       /* no file in our tree; see whether it exists at all */
1216       struct stat statbuf;
1217
1218       memset(&statbuf, 0, sizeof(statbuf));
1219       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1221       else
1222         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1223     }
1224     else if (status < 0)
1225       return send_response(sock, RESP_ERR, "Internal error.\n");
1226     else
1227       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1228   }
1229
1230   /* NOTREACHED */
1231   assert(1==0);
1232 } /* }}} int handle_request_flush */
1233
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235 {
1236   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237
1238   pthread_mutex_lock(&cache_lock);
1239   flush_old_values(-1);
1240   pthread_mutex_unlock(&cache_lock);
1241
1242   return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1244
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246 {
1247   int status;
1248   char *file, file_tmp[PATH_MAX];
1249   cache_item_t *ci;
1250
1251   status = buffer_get_field(&buffer, &buffer_size, &file);
1252   if (status != 0)
1253     return syntax_error(sock,cmd);
1254
1255   get_abs_path(&file, file_tmp);
1256
1257   pthread_mutex_lock(&cache_lock);
1258   ci = g_tree_lookup(cache_tree, file);
1259   if (ci == NULL)
1260   {
1261     pthread_mutex_unlock(&cache_lock);
1262     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1263   }
1264
1265   for (size_t i=0; i < ci->values_num; i++)
1266     add_response_info(sock, "%s\n", ci->values[i]);
1267
1268   pthread_mutex_unlock(&cache_lock);
1269   return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1271
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273 {
1274   int status;
1275   gboolean found;
1276   char *file, file_tmp[PATH_MAX];
1277
1278   status = buffer_get_field(&buffer, &buffer_size, &file);
1279   if (status != 0)
1280     return syntax_error(sock,cmd);
1281
1282   get_abs_path(&file, file_tmp);
1283   if (!check_file_access(file, sock)) return 0;
1284
1285   pthread_mutex_lock(&cache_lock);
1286   found = g_tree_remove(cache_tree, file);
1287   pthread_mutex_unlock(&cache_lock);
1288
1289   if (found == TRUE)
1290   {
1291     if (!JOURNAL_REPLAY(sock))
1292       journal_write("forget", file);
1293
1294     return send_response(sock, RESP_OK, "Gone!\n");
1295   }
1296   else
1297     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298
1299   /* NOTREACHED */
1300   assert(1==0);
1301 } /* }}} static int handle_request_forget */
1302
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304 {
1305   cache_item_t *ci;
1306
1307   pthread_mutex_lock(&cache_lock);
1308
1309   ci = cache_queue_head;
1310   while (ci != NULL)
1311   {
1312     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1313     ci = ci->next;
1314   }
1315
1316   pthread_mutex_unlock(&cache_lock);
1317
1318   return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1320
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322 {
1323   char *file, file_tmp[PATH_MAX];
1324   int values_num = 0;
1325   int status;
1326   char orig_buf[CMD_MAX];
1327
1328   cache_item_t *ci;
1329
1330   /* save it for the journal later */
1331   if (!JOURNAL_REPLAY(sock))
1332     strncpy(orig_buf, buffer, buffer_size);
1333
1334   status = buffer_get_field (&buffer, &buffer_size, &file);
1335   if (status != 0)
1336     return syntax_error(sock,cmd);
1337
1338   pthread_mutex_lock(&stats_lock);
1339   stats_updates_received++;
1340   pthread_mutex_unlock(&stats_lock);
1341
1342   get_abs_path(&file, file_tmp);
1343   if (!check_file_access(file, sock)) return 0;
1344
1345   pthread_mutex_lock (&cache_lock);
1346   ci = g_tree_lookup (cache_tree, file);
1347
1348   if (ci == NULL) /* {{{ */
1349   {
1350     struct stat statbuf;
1351     cache_item_t *tmp;
1352
1353     /* don't hold the lock while we setup; stat(2) might block */
1354     pthread_mutex_unlock(&cache_lock);
1355
1356     memset (&statbuf, 0, sizeof (statbuf));
1357     status = stat (file, &statbuf);
1358     if (status != 0)
1359     {
1360       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361
1362       status = errno;
1363       if (status == ENOENT)
1364         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1365       else
1366         return send_response(sock, RESP_ERR,
1367                              "stat failed with error %i.\n", status);
1368     }
1369     if (!S_ISREG (statbuf.st_mode))
1370       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371
1372     if (access(file, R_OK|W_OK) != 0)
1373       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374                            file, rrd_strerror(errno));
1375
1376     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1377     if (ci == NULL)
1378     {
1379       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380
1381       return send_response(sock, RESP_ERR, "malloc failed.\n");
1382     }
1383     memset (ci, 0, sizeof (cache_item_t));
1384
1385     ci->file = strdup (file);
1386     if (ci->file == NULL)
1387     {
1388       free (ci);
1389       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390
1391       return send_response(sock, RESP_ERR, "strdup failed.\n");
1392     }
1393
1394     wipe_ci_values(ci, now);
1395     ci->flags = CI_FLAGS_IN_TREE;
1396     pthread_cond_init(&ci->flushed, NULL);
1397
1398     pthread_mutex_lock(&cache_lock);
1399
1400     /* another UPDATE might have added this entry in the meantime */
1401     tmp = g_tree_lookup (cache_tree, file);
1402     if (tmp == NULL)
1403       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1404     else
1405     {
1406       free_cache_item (ci);
1407       ci = tmp;
1408     }
1409
1410     /* state may have changed while we were unlocked */
1411     if (state == SHUTDOWN)
1412       return -1;
1413   } /* }}} */
1414   assert (ci != NULL);
1415
1416   /* don't re-write updates in replay mode */
1417   if (!JOURNAL_REPLAY(sock))
1418     journal_write("update", orig_buf);
1419
1420   while (buffer_size > 0)
1421   {
1422     char *value;
1423     time_t stamp;
1424     char *eostamp;
1425
1426     status = buffer_get_field (&buffer, &buffer_size, &value);
1427     if (status != 0)
1428     {
1429       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1430       break;
1431     }
1432
1433     /* make sure update time is always moving forward */
1434     stamp = strtol(value, &eostamp, 10);
1435     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436     {
1437       pthread_mutex_unlock(&cache_lock);
1438       return send_response(sock, RESP_ERR,
1439                            "Cannot find timestamp in '%s'!\n", value);
1440     }
1441     else if (stamp <= ci->last_update_stamp)
1442     {
1443       pthread_mutex_unlock(&cache_lock);
1444       return send_response(sock, RESP_ERR,
1445                            "illegal attempt to update using time %ld when last"
1446                            " update time is %ld (minimum one second step)\n",
1447                            stamp, ci->last_update_stamp);
1448     }
1449     else
1450       ci->last_update_stamp = stamp;
1451
1452     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453                               &ci->values_alloc, config_alloc_chunk))
1454     {
1455       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1456       continue;
1457     }
1458
1459     values_num++;
1460   }
1461
1462   if (((now - ci->last_flush_time) >= config_write_interval)
1463       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464       && (ci->values_num > 0))
1465   {
1466     enqueue_cache_item (ci, TAIL);
1467   }
1468
1469   pthread_mutex_unlock (&cache_lock);
1470
1471   if (values_num < 1)
1472     return send_response(sock, RESP_ERR, "No values updated.\n");
1473   else
1474     return send_response(sock, RESP_OK,
1475                          "errors, enqueued %i value(s).\n", values_num);
1476
1477   /* NOTREACHED */
1478   assert(1==0);
1479
1480 } /* }}} int handle_request_update */
1481
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1483 {
1484   char *file, file_tmp[PATH_MAX];
1485   char *cf;
1486
1487   char *start_str;
1488   char *end_str;
1489   time_t start_tm;
1490   time_t end_tm;
1491
1492   unsigned long step;
1493   unsigned long ds_cnt;
1494   char **ds_namv;
1495   rrd_value_t *data;
1496
1497   int status;
1498   unsigned long i;
1499   time_t t;
1500   rrd_value_t *data_ptr;
1501
1502   file = NULL;
1503   cf = NULL;
1504   start_str = NULL;
1505   end_str = NULL;
1506
1507   /* Read the arguments */
1508   do /* while (0) */
1509   {
1510     status = buffer_get_field (&buffer, &buffer_size, &file);
1511     if (status != 0)
1512       break;
1513
1514     status = buffer_get_field (&buffer, &buffer_size, &cf);
1515     if (status != 0)
1516       break;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519     if (status != 0)
1520     {
1521       start_str = NULL;
1522       status = 0;
1523       break;
1524     }
1525
1526     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527     if (status != 0)
1528     {
1529       end_str = NULL;
1530       status = 0;
1531       break;
1532     }
1533   } while (0);
1534
1535   if (status != 0)
1536     return (syntax_error(sock,cmd));
1537
1538   get_abs_path(&file, file_tmp);
1539   if (!check_file_access(file, sock)) return 0;
1540
1541   status = flush_file (file);
1542   if ((status != 0) && (status != ENOENT))
1543     return (send_response (sock, RESP_ERR,
1544           "flush_file (%s) failed with status %i.\n", file, status));
1545
1546   t = time (NULL); /* "now" */
1547
1548   /* Parse start time */
1549   if (start_str != NULL)
1550   {
1551     char *endptr;
1552     long value;
1553
1554     endptr = NULL;
1555     errno = 0;
1556     value = strtol (start_str, &endptr, /* base = */ 0);
1557     if ((endptr == start_str) || (errno != 0))
1558       return (send_response(sock, RESP_ERR,
1559             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1560             start_str));
1561
1562     if (value > 0)
1563       start_tm = (time_t) value;
1564     else
1565       start_tm = (time_t) (t + value);
1566   }
1567   else
1568   {
1569     start_tm = t - 86400;
1570   }
1571
1572   /* Parse end time */
1573   if (end_str != NULL)
1574   {
1575     char *endptr;
1576     long value;
1577
1578     endptr = NULL;
1579     errno = 0;
1580     value = strtol (end_str, &endptr, /* base = */ 0);
1581     if ((endptr == end_str) || (errno != 0))
1582       return (send_response(sock, RESP_ERR,
1583             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1584             end_str));
1585
1586     if (value > 0)
1587       end_tm = (time_t) value;
1588     else
1589       end_tm = (time_t) (t + value);
1590   }
1591   else
1592   {
1593     end_tm = t;
1594   }
1595
1596   step = -1;
1597   ds_cnt = 0;
1598   ds_namv = NULL;
1599   data = NULL;
1600
1601   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602       &ds_cnt, &ds_namv, &data);
1603   if (status != 0)
1604     return (send_response(sock, RESP_ERR,
1605           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1606
1607   add_response_info (sock, "FlushVersion: %lu\n", 1);
1608   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610   add_response_info (sock, "Step: %lu\n", step);
1611   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1612
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614     size_t str_len = strlen (str); \
1615     if ((buffer_fill + str_len) > sizeof (buffer)) \
1616       str_len = sizeof (buffer) - buffer_fill; \
1617     if (str_len > 0) { \
1618       strncpy (buffer + buffer_fill, str, str_len); \
1619       buffer_fill += str_len; \
1620       assert (buffer_fill <= sizeof (buffer)); \
1621       if (buffer_fill == sizeof (buffer)) \
1622         buffer[buffer_fill - 1] = 0; \
1623       else \
1624         buffer[buffer_fill] = 0; \
1625     } \
1626   } while (0)
1627
1628   { /* Add list of DS names */
1629     char linebuf[1024];
1630     size_t linebuf_fill;
1631
1632     memset (linebuf, 0, sizeof (linebuf));
1633     linebuf_fill = 0;
1634     for (i = 0; i < ds_cnt; i++)
1635     {
1636       if (i > 0)
1637         SSTRCAT (linebuf, " ", linebuf_fill);
1638       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639       rrd_freemem(ds_namv[i]);
1640     }
1641     rrd_freemem(ds_namv);
1642     add_response_info (sock, "DSName: %s\n", linebuf);
1643   }
1644
1645   /* Add the actual data */
1646   assert (step > 0);
1647   data_ptr = data;
1648   for (t = start_tm + step; t <= end_tm; t += step)
1649   {
1650     char linebuf[1024];
1651     size_t linebuf_fill;
1652     char tmp[128];
1653
1654     memset (linebuf, 0, sizeof (linebuf));
1655     linebuf_fill = 0;
1656     for (i = 0; i < ds_cnt; i++)
1657     {
1658       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659       tmp[sizeof (tmp) - 1] = 0;
1660       SSTRCAT (linebuf, tmp, linebuf_fill);
1661
1662       data_ptr++;
1663     }
1664
1665     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1666   } /* for (t) */
1667   rrd_freemem(data);
1668
1669   return (send_response (sock, RESP_OK, "Success\n"));
1670 #undef SSTRCAT
1671 } /* }}} int handle_request_fetch */
1672
1673 /* we came across a "WROTE" entry during journal replay.
1674  * throw away any values that we have accumulated for this file
1675  */
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1677 {
1678   cache_item_t *ci;
1679   const char *file = buffer;
1680
1681   pthread_mutex_lock(&cache_lock);
1682
1683   ci = g_tree_lookup(cache_tree, file);
1684   if (ci == NULL)
1685   {
1686     pthread_mutex_unlock(&cache_lock);
1687     return (0);
1688   }
1689
1690   if (ci->values)
1691     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1692
1693   wipe_ci_values(ci, now);
1694   remove_from_queue(ci);
1695
1696   pthread_mutex_unlock(&cache_lock);
1697   return (0);
1698 } /* }}} int handle_request_wrote */
1699
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1701 {
1702   char *file, file_tmp[PATH_MAX];
1703   int status;
1704   rrd_info_t *data;
1705
1706   /* obtain filename */
1707   status = buffer_get_field(&buffer, &buffer_size, &file);
1708   if (status != 0)
1709     return syntax_error(sock,cmd);
1710   /* get full pathname */
1711   get_abs_path(&file, file_tmp);
1712   if (!check_file_access(file, sock)) {
1713     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1714   }
1715   /* get data */
1716   rrd_clear_error ();
1717   data = rrd_info_r(file);
1718   if(!data) {
1719     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1720   }
1721   while (data) {
1722       switch (data->type) {
1723       case RD_I_VAL:
1724           if (isnan(data->value.u_val))
1725               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1726           else
1727               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1728           break;
1729       case RD_I_CNT:
1730           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1731           break;
1732       case RD_I_INT:
1733           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1734           break;
1735       case RD_I_STR:
1736           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1737           break;
1738       case RD_I_BLO:
1739           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1740           break;
1741       }
1742       data = data->next;
1743   }
1744   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info  */
1746
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1748 {
1749   char *i, *file, file_tmp[PATH_MAX];
1750   int status;
1751   int idx;
1752   time_t t;
1753
1754   /* obtain filename */
1755   status = buffer_get_field(&buffer, &buffer_size, &file);
1756   if (status != 0)
1757     return syntax_error(sock,cmd);
1758   /* get full pathname */
1759   get_abs_path(&file, file_tmp);
1760   if (!check_file_access(file, sock)) {
1761     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762   }
1763
1764   status = buffer_get_field(&buffer, &buffer_size, &i);
1765   if (status != 0)
1766     return syntax_error(sock,cmd);
1767   idx = atoi(i);
1768   if(idx<0) { 
1769     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1770   }
1771
1772   /* get data */
1773   rrd_clear_error ();
1774   t = rrd_first_r(file,idx);
1775   if(t<1) {
1776     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1777   }
1778   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first  */
1780
1781
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1783 {
1784   char *file, file_tmp[PATH_MAX];
1785   int status;
1786   time_t t, from_file, step;
1787   rrd_file_t * rrd_file;
1788   cache_item_t * ci;
1789   rrd_t rrd; 
1790
1791   /* obtain filename */
1792   status = buffer_get_field(&buffer, &buffer_size, &file);
1793   if (status != 0)
1794     return syntax_error(sock,cmd);
1795   /* get full pathname */
1796   get_abs_path(&file, file_tmp);
1797   if (!check_file_access(file, sock)) {
1798     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1799   }
1800   rrd_clear_error();
1801   rrd_init(&rrd);
1802   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1803   if(!rrd_file) {
1804     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1805   }
1806   from_file = rrd.live_head->last_up;
1807   step = rrd.stat_head->pdp_step;
1808   rrd_close(rrd_file);
1809   pthread_mutex_lock(&cache_lock);
1810   ci = g_tree_lookup(cache_tree, file);
1811   if (ci)
1812     t = ci->last_update_stamp;
1813   else
1814     t = from_file;
1815   pthread_mutex_unlock(&cache_lock);
1816   t -= t % step;
1817   rrd_free(&rrd);
1818   if(t<1) {
1819     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1820   }
1821   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last  */
1823
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1825 {
1826   char *file, file_tmp[PATH_MAX];
1827   char *tok;
1828   int ac = 0;
1829   char *av[128];
1830   int status;
1831   unsigned long step = 300;
1832   time_t last_up = time(NULL)-10;
1833   int no_overwrite = opt_no_overwrite;
1834
1835
1836   /* obtain filename */
1837   status = buffer_get_field(&buffer, &buffer_size, &file);
1838   if (status != 0)
1839     return syntax_error(sock,cmd);
1840   /* get full pathname */
1841   get_abs_path(&file, file_tmp);
1842   if (!check_file_access(file, sock)) {
1843     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1844   }
1845   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1846
1847   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848     if( ! strncmp(tok,"-b",2) ) {
1849       status = buffer_get_field(&buffer, &buffer_size, &tok );
1850       if (status != 0) return syntax_error(sock,cmd);
1851       last_up = (time_t) atol(tok);
1852       continue;
1853     }
1854     if( ! strncmp(tok,"-s",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       step = atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-O",2) ) {
1861       no_overwrite = 1;
1862       continue;
1863     }
1864     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866     return syntax_error(sock,cmd);
1867   }
1868   if(step<1) {
1869     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1870   }
1871   if (last_up < 3600 * 24 * 365 * 10) {
1872     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1873   }
1874
1875   rrd_clear_error ();
1876   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1877
1878   if(!status) {
1879     return send_response(sock, RESP_OK, "RRD created OK\n");
1880   }
1881   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create  */
1883
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1886 {
1887   int status;
1888   if (sock->batch_start)
1889     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1890
1891   status = send_response(sock, RESP_OK,
1892                          "Go ahead.  End with dot '.' on its own line.\n");
1893   sock->batch_start = time(NULL);
1894   sock->batch_cmd = 0;
1895
1896   return status;
1897 } /* }}} static int batch_start */
1898
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1901 {
1902   assert(sock->batch_start);
1903   sock->batch_start = 0;
1904   sock->batch_cmd  = 0;
1905   return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1907
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1909 {
1910   return -1;
1911 } /* }}} static int handle_request_quit */
1912
1913 static command_t list_of_commands[] = { /* {{{ */
1914   {
1915     "UPDATE",
1916     handle_request_update,
1917     CMD_CONTEXT_ANY,
1918     "UPDATE <filename> <values> [<values> ...]\n"
1919     ,
1920     "Adds the given file to the internal cache if it is not yet known and\n"
1921     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1922     "for details.\n"
1923     "\n"
1924     "Each <values> has the following form:\n"
1925     "  <values> = <time>:<value>[:<value>[...]]\n"
1926     "See the rrdupdate(1) manpage for details.\n"
1927   },
1928   {
1929     "WROTE",
1930     handle_request_wrote,
1931     CMD_CONTEXT_JOURNAL,
1932     NULL,
1933     NULL
1934   },
1935   {
1936     "FLUSH",
1937     handle_request_flush,
1938     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939     "FLUSH <filename>\n"
1940     ,
1941     "Adds the given filename to the head of the update queue and returns\n"
1942     "after it has been dequeued.\n"
1943   },
1944   {
1945     "FLUSHALL",
1946     handle_request_flushall,
1947     CMD_CONTEXT_CLIENT,
1948     "FLUSHALL\n"
1949     ,
1950     "Triggers writing of all pending updates.  Returns immediately.\n"
1951   },
1952   {
1953     "PENDING",
1954     handle_request_pending,
1955     CMD_CONTEXT_CLIENT,
1956     "PENDING <filename>\n"
1957     ,
1958     "Shows any 'pending' updates for a file, in order.\n"
1959     "The updates shown have not yet been written to the underlying RRD file.\n"
1960   },
1961   {
1962     "FORGET",
1963     handle_request_forget,
1964     CMD_CONTEXT_ANY,
1965     "FORGET <filename>\n"
1966     ,
1967     "Removes the file completely from the cache.\n"
1968     "Any pending updates for the file will be lost.\n"
1969   },
1970   {
1971     "QUEUE",
1972     handle_request_queue,
1973     CMD_CONTEXT_CLIENT,
1974     "QUEUE\n"
1975     ,
1976         "Shows all files in the output queue.\n"
1977     "The output is zero or more lines in the following format:\n"
1978     "(where <num_vals> is the number of values to be written)\n"
1979     "\n"
1980     "<num_vals> <filename>\n"
1981   },
1982   {
1983     "STATS",
1984     handle_request_stats,
1985     CMD_CONTEXT_CLIENT,
1986     "STATS\n"
1987     ,
1988     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989     "a description of the values.\n"
1990   },
1991   {
1992     "HELP",
1993     handle_request_help,
1994     CMD_CONTEXT_CLIENT,
1995     "HELP [<command>]\n",
1996     NULL, /* special! */
1997   },
1998   {
1999     "BATCH",
2000     batch_start,
2001     CMD_CONTEXT_CLIENT,
2002     "BATCH\n"
2003     ,
2004     "The 'BATCH' command permits the client to initiate a bulk load\n"
2005     "   of commands to rrdcached.\n"
2006     "\n"
2007     "Usage:\n"
2008     "\n"
2009     "    client: BATCH\n"
2010     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2011     "    client: command #1\n"
2012     "    client: command #2\n"
2013     "    client: ... and so on\n"
2014     "    client: .\n"
2015     "    server: 2 errors\n"
2016     "    server: 7 message for command #7\n"
2017     "    server: 9 message for command #9\n"
2018     "\n"
2019     "For more information, consult the rrdcached(1) documentation.\n"
2020   },
2021   {
2022     ".",   /* BATCH terminator */
2023     batch_done,
2024     CMD_CONTEXT_BATCH,
2025     NULL,
2026     NULL
2027   },
2028   {
2029     "FETCH",
2030     handle_request_fetch,
2031     CMD_CONTEXT_CLIENT,
2032     "FETCH <file> <CF> [<start> [<end>]]\n"
2033     ,
2034     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2035   },
2036   {
2037     "INFO",
2038     handle_request_info,
2039     CMD_CONTEXT_CLIENT,
2040     "INFO <filename>\n",
2041     "The INFO command retrieves information about a specified RRD file.\n"
2042     "This is returned in standard rrdinfo format, a sequence of lines\n"
2043     "with the format <keyname> = <value>\n"
2044     "Note that this is the data as of the last update of the RRD file itself,\n"
2045     "not the last time data was received via rrdcached, so there may be pending\n"
2046     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2047   },
2048   {
2049     "FIRST",
2050     handle_request_first,
2051     CMD_CONTEXT_CLIENT,
2052     "FIRST <filename> <rra index>\n",
2053     "The FIRST command retrieves the first data time for a specified RRA in\n"
2054     "an RRD file.\n"
2055   },
2056   {
2057     "LAST",
2058     handle_request_last,
2059     CMD_CONTEXT_CLIENT,
2060     "LAST <filename>\n",
2061     "The LAST command retrieves the last update time for a specified RRD file.\n"
2062     "Note that this is the time of the last update of the RRD file itself, not\n"
2063     "the last time data was received via rrdcached, so there may be pending\n"
2064     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2065   },
2066   {
2067     "CREATE",
2068     handle_request_create,
2069     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071     "The CREATE command will create an RRD file, overwriting any existing file\n"
2072     "unless the -O option is given or rrdcached was started with the -O option.\n"
2073     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074     "not acceptable) and the step is in seconds (default is 300).\n"
2075     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2076   },
2077   {
2078     "QUIT",
2079     handle_request_quit,
2080     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2081     "QUIT\n"
2082     ,
2083     "Disconnect from rrdcached.\n"
2084   }
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087   / sizeof (list_of_commands[0]);
2088
2089 static command_t *find_command(char *cmd)
2090 {
2091   size_t i;
2092
2093   for (i = 0; i < list_of_commands_len; i++)
2094     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095       return (&list_of_commands[i]);
2096   return NULL;
2097 }
2098
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101  * outside these functions so that switching to a more elegant storage method
2102  * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2104 {
2105   size_t i;
2106
2107   for (i = 0; i < list_of_commands_len; i++)
2108     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109       return ((ssize_t) i);
2110   return (-1);
2111 } /* }}} ssize_t find_command_index */
2112
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2114     const char *cmd)
2115 {
2116   ssize_t i;
2117
2118   if (JOURNAL_REPLAY(sock))
2119     return (1);
2120
2121   if (cmd == NULL)
2122     return (-1);
2123
2124   if ((strcasecmp ("QUIT", cmd) == 0)
2125       || (strcasecmp ("HELP", cmd) == 0))
2126     return (1);
2127   else if (strcmp (".", cmd) == 0)
2128     cmd = "BATCH";
2129
2130   i = find_command_index (cmd);
2131   if (i < 0)
2132     return (-1);
2133   assert (i < 32);
2134
2135   if ((sock->permissions & (1 << i)) != 0)
2136     return (1);
2137   return (0);
2138 } /* }}} int socket_permission_check */
2139
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2141     const char *cmd)
2142 {
2143   ssize_t i;
2144
2145   i = find_command_index (cmd);
2146   if (i < 0)
2147     return (-1);
2148   assert (i < 32);
2149
2150   sock->permissions |= (1 << i);
2151   return (0);
2152 } /* }}} int socket_permission_add */
2153
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2155 {
2156   sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2158
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160     listen_socket_t *src)
2161 {
2162   dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2164
2165 /* check whether commands are received in the expected context */
2166 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2167 {
2168   if (JOURNAL_REPLAY(sock))
2169     return (cmd->context & CMD_CONTEXT_JOURNAL);
2170   else if (sock->batch_start)
2171     return (cmd->context & CMD_CONTEXT_BATCH);
2172   else
2173     return (cmd->context & CMD_CONTEXT_CLIENT);
2174
2175   /* NOTREACHED */
2176   assert(1==0);
2177 }
2178
2179 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2180 {
2181   int status;
2182   char *cmd_str;
2183   char *resp_txt;
2184   command_t *help = NULL;
2185
2186   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2187   if (status == 0)
2188     help = find_command(cmd_str);
2189
2190   if (help && (help->syntax || help->help))
2191   {
2192     char tmp[CMD_MAX];
2193
2194     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2195     resp_txt = tmp;
2196
2197     if (help->syntax)
2198       add_response_info(sock, "Usage: %s\n", help->syntax);
2199
2200     if (help->help)
2201       add_response_info(sock, "%s\n", help->help);
2202   }
2203   else
2204   {
2205     size_t i;
2206
2207     resp_txt = "Command overview\n";
2208
2209     for (i = 0; i < list_of_commands_len; i++)
2210     {
2211       if (list_of_commands[i].syntax == NULL)
2212         continue;
2213       add_response_info (sock, "%s", list_of_commands[i].syntax);
2214     }
2215   }
2216
2217   return send_response(sock, RESP_OK, resp_txt);
2218 } /* }}} int handle_request_help */
2219
2220 static int handle_request (DISPATCH_PROTO) /* {{{ */
2221 {
2222   char *buffer_ptr = buffer;
2223   char *cmd_str = NULL;
2224   command_t *cmd = NULL;
2225   int status;
2226
2227   assert (buffer[buffer_size - 1] == '\0');
2228
2229   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2230   if (status != 0)
2231   {
2232     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2233     return (-1);
2234   }
2235
2236   if (sock != NULL && sock->batch_start)
2237     sock->batch_cmd++;
2238
2239   cmd = find_command(cmd_str);
2240   if (!cmd)
2241     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2242
2243   if (!socket_permission_check (sock, cmd->cmd))
2244     return send_response(sock, RESP_ERR, "Permission denied.\n");
2245
2246   if (!command_check_context(sock, cmd))
2247     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2248
2249   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2250 } /* }}} int handle_request */
2251
2252 static void journal_set_free (journal_set *js) /* {{{ */
2253 {
2254   if (js == NULL)
2255     return;
2256
2257   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2258
2259   free(js);
2260 } /* }}} journal_set_free */
2261
2262 static void journal_set_remove (journal_set *js) /* {{{ */
2263 {
2264   if (js == NULL)
2265     return;
2266
2267   for (uint i=0; i < js->files_num; i++)
2268   {
2269     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2270     unlink(js->files[i]);
2271   }
2272 } /* }}} journal_set_remove */
2273
2274 /* close current journal file handle.
2275  * MUST hold journal_lock before calling */
2276 static void journal_close(void) /* {{{ */
2277 {
2278   if (journal_fh != NULL)
2279   {
2280     if (fclose(journal_fh) != 0)
2281       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2282   }
2283
2284   journal_fh = NULL;
2285   journal_size = 0;
2286 } /* }}} journal_close */
2287
2288 /* MUST hold journal_lock before calling */
2289 static void journal_new_file(void) /* {{{ */
2290 {
2291   struct timeval now;
2292   int  new_fd;
2293   char new_file[PATH_MAX + 1];
2294
2295   assert(journal_dir != NULL);
2296   assert(journal_cur != NULL);
2297
2298   journal_close();
2299
2300   gettimeofday(&now, NULL);
2301   /* this format assures that the files sort in strcmp() order */
2302   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2303            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2304
2305   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2306                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2307   if (new_fd < 0)
2308     goto error;
2309
2310   journal_fh = fdopen(new_fd, "a");
2311   if (journal_fh == NULL)
2312     goto error;
2313
2314   journal_size = ftell(journal_fh);
2315   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2316
2317   /* record the file in the journal set */
2318   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2319
2320   return;
2321
2322 error:
2323   RRDD_LOG(LOG_CRIT,
2324            "JOURNALING DISABLED: Error while trying to create %s : %s",
2325            new_file, rrd_strerror(errno));
2326   RRDD_LOG(LOG_CRIT,
2327            "JOURNALING DISABLED: All values will be flushed at shutdown");
2328
2329   close(new_fd);
2330   config_flush_at_shutdown = 1;
2331
2332 } /* }}} journal_new_file */
2333
2334 /* MUST NOT hold journal_lock before calling this */
2335 static void journal_rotate(void) /* {{{ */
2336 {
2337   journal_set *old_js = NULL;
2338
2339   if (journal_dir == NULL)
2340     return;
2341
2342   RRDD_LOG(LOG_DEBUG, "rotating journals");
2343
2344   pthread_mutex_lock(&stats_lock);
2345   ++stats_journal_rotate;
2346   pthread_mutex_unlock(&stats_lock);
2347
2348   pthread_mutex_lock(&journal_lock);
2349
2350   journal_close();
2351
2352   /* rotate the journal sets */
2353   old_js = journal_old;
2354   journal_old = journal_cur;
2355   journal_cur = calloc(1, sizeof(journal_set));
2356
2357   if (journal_cur != NULL)
2358     journal_new_file();
2359   else
2360     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2361
2362   pthread_mutex_unlock(&journal_lock);
2363
2364   journal_set_remove(old_js);
2365   journal_set_free  (old_js);
2366
2367 } /* }}} static void journal_rotate */
2368
2369 /* MUST hold journal_lock when calling */
2370 static void journal_done(void) /* {{{ */
2371 {
2372   if (journal_cur == NULL)
2373     return;
2374
2375   journal_close();
2376
2377   if (config_flush_at_shutdown)
2378   {
2379     RRDD_LOG(LOG_INFO, "removing journals");
2380     journal_set_remove(journal_old);
2381     journal_set_remove(journal_cur);
2382   }
2383   else
2384   {
2385     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2386              "journals will be used at next startup");
2387   }
2388
2389   journal_set_free(journal_cur);
2390   journal_set_free(journal_old);
2391   free(journal_dir);
2392
2393 } /* }}} static void journal_done */
2394
2395 static int journal_write(char *cmd, char *args) /* {{{ */
2396 {
2397   int chars;
2398
2399   if (journal_fh == NULL)
2400     return 0;
2401
2402   pthread_mutex_lock(&journal_lock);
2403   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2404   journal_size += chars;
2405
2406   if (journal_size > JOURNAL_MAX)
2407     journal_new_file();
2408
2409   pthread_mutex_unlock(&journal_lock);
2410
2411   if (chars > 0)
2412   {
2413     pthread_mutex_lock(&stats_lock);
2414     stats_journal_bytes += chars;
2415     pthread_mutex_unlock(&stats_lock);
2416   }
2417
2418   return chars;
2419 } /* }}} static int journal_write */
2420
2421 static int journal_replay (const char *file) /* {{{ */
2422 {
2423   FILE *fh;
2424   int entry_cnt = 0;
2425   int fail_cnt = 0;
2426   uint64_t line = 0;
2427   char entry[CMD_MAX];
2428   time_t now;
2429
2430   if (file == NULL) return 0;
2431
2432   {
2433     char *reason = "unknown error";
2434     int status = 0;
2435     struct stat statbuf;
2436
2437     memset(&statbuf, 0, sizeof(statbuf));
2438     if (stat(file, &statbuf) != 0)
2439     {
2440       reason = "stat error";
2441       status = errno;
2442     }
2443     else if (!S_ISREG(statbuf.st_mode))
2444     {
2445       reason = "not a regular file";
2446       status = EPERM;
2447     }
2448     if (statbuf.st_uid != daemon_uid)
2449     {
2450       reason = "not owned by daemon user";
2451       status = EACCES;
2452     }
2453     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2454     {
2455       reason = "must not be user/group writable";
2456       status = EACCES;
2457     }
2458
2459     if (status != 0)
2460     {
2461       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2462                file, rrd_strerror(status), reason);
2463       return 0;
2464     }
2465   }
2466
2467   fh = fopen(file, "r");
2468   if (fh == NULL)
2469   {
2470     if (errno != ENOENT)
2471       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2472                file, rrd_strerror(errno));
2473     return 0;
2474   }
2475   else
2476     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2477
2478   now = time(NULL);
2479
2480   while(!feof(fh))
2481   {
2482     size_t entry_len;
2483
2484     ++line;
2485     if (fgets(entry, sizeof(entry), fh) == NULL)
2486       break;
2487     entry_len = strlen(entry);
2488
2489     /* check \n termination in case journal writing crashed mid-line */
2490     if (entry_len == 0)
2491       continue;
2492     else if (entry[entry_len - 1] != '\n')
2493     {
2494       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2495       ++fail_cnt;
2496       continue;
2497     }
2498
2499     entry[entry_len - 1] = '\0';
2500
2501     if (handle_request(NULL, now, entry, entry_len) == 0)
2502       ++entry_cnt;
2503     else
2504       ++fail_cnt;
2505   }
2506
2507   fclose(fh);
2508
2509   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2510            entry_cnt, fail_cnt);
2511
2512   return entry_cnt > 0 ? 1 : 0;
2513 } /* }}} static int journal_replay */
2514
2515 static int journal_sort(const void *v1, const void *v2)
2516 {
2517   char **jn1 = (char **) v1;
2518   char **jn2 = (char **) v2;
2519
2520   return strcmp(*jn1,*jn2);
2521 }
2522
2523 static void journal_init(void) /* {{{ */
2524 {
2525   int had_journal = 0;
2526   DIR *dir;
2527   struct dirent *dent;
2528   char path[PATH_MAX+1];
2529
2530   if (journal_dir == NULL) return;
2531
2532   pthread_mutex_lock(&journal_lock);
2533
2534   journal_cur = calloc(1, sizeof(journal_set));
2535   if (journal_cur == NULL)
2536   {
2537     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2538     return;
2539   }
2540
2541   RRDD_LOG(LOG_INFO, "checking for journal files");
2542
2543   /* Handle old journal files during transition.  This gives them the
2544    * correct sort order.  TODO: remove after first release
2545    */
2546   {
2547     char old_path[PATH_MAX+1];
2548     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2549     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2550     rename(old_path, path);
2551
2552     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2553     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2554     rename(old_path, path);
2555   }
2556
2557   dir = opendir(journal_dir);
2558   if (!dir) {
2559     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2560     return;
2561   }
2562   while ((dent = readdir(dir)) != NULL)
2563   {
2564     /* looks like a journal file? */
2565     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2566       continue;
2567
2568     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2569
2570     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2571     {
2572       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2573                dent->d_name);
2574       break;
2575     }
2576   }
2577   closedir(dir);
2578
2579   qsort(journal_cur->files, journal_cur->files_num,
2580         sizeof(journal_cur->files[0]), journal_sort);
2581
2582   for (uint i=0; i < journal_cur->files_num; i++)
2583     had_journal += journal_replay(journal_cur->files[i]);
2584
2585   journal_new_file();
2586
2587   /* it must have been a crash.  start a flush */
2588   if (had_journal && config_flush_at_shutdown)
2589     flush_old_values(-1);
2590
2591   pthread_mutex_unlock(&journal_lock);
2592
2593   RRDD_LOG(LOG_INFO, "journal processing complete");
2594
2595 } /* }}} static void journal_init */
2596
2597 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2598 {
2599   assert(sock != NULL);
2600
2601   free(sock->rbuf);  sock->rbuf = NULL;
2602   free(sock->wbuf);  sock->wbuf = NULL;
2603   free(sock);
2604 } /* }}} void free_listen_socket */
2605
2606 static void close_connection(listen_socket_t *sock) /* {{{ */
2607 {
2608   if (sock->fd >= 0)
2609   {
2610     close(sock->fd);
2611     sock->fd = -1;
2612   }
2613
2614   free_listen_socket(sock);
2615
2616 } /* }}} void close_connection */
2617
2618 static void *connection_thread_main (void *args) /* {{{ */
2619 {
2620   listen_socket_t *sock;
2621   int fd;
2622
2623   sock = (listen_socket_t *) args;
2624   fd = sock->fd;
2625
2626   /* init read buffers */
2627   sock->next_read = sock->next_cmd = 0;
2628   sock->rbuf = malloc(RBUF_SIZE);
2629   if (sock->rbuf == NULL)
2630   {
2631     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2632     close_connection(sock);
2633     return NULL;
2634   }
2635
2636   pthread_mutex_lock (&connection_threads_lock);
2637   connection_threads_num++;
2638   pthread_mutex_unlock (&connection_threads_lock);
2639
2640   while (state == RUNNING)
2641   {
2642     char *cmd;
2643     ssize_t cmd_len;
2644     ssize_t rbytes;
2645     time_t now;
2646
2647     struct pollfd pollfd;
2648     int status;
2649
2650     pollfd.fd = fd;
2651     pollfd.events = POLLIN | POLLPRI;
2652     pollfd.revents = 0;
2653
2654     status = poll (&pollfd, 1, /* timeout = */ 500);
2655     if (state != RUNNING)
2656       break;
2657     else if (status == 0) /* timeout */
2658       continue;
2659     else if (status < 0) /* error */
2660     {
2661       status = errno;
2662       if (status != EINTR)
2663         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2664       continue;
2665     }
2666
2667     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2668       break;
2669     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2670     {
2671       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2672           "poll(2) returned something unexpected: %#04hx",
2673           pollfd.revents);
2674       break;
2675     }
2676
2677     rbytes = read(fd, sock->rbuf + sock->next_read,
2678                   RBUF_SIZE - sock->next_read);
2679     if (rbytes < 0)
2680     {
2681       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2682       break;
2683     }
2684     else if (rbytes == 0)
2685       break; /* eof */
2686
2687     sock->next_read += rbytes;
2688
2689     if (sock->batch_start)
2690       now = sock->batch_start;
2691     else
2692       now = time(NULL);
2693
2694     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2695     {
2696       status = handle_request (sock, now, cmd, cmd_len+1);
2697       if (status != 0)
2698         goto out_close;
2699     }
2700   }
2701
2702 out_close:
2703   close_connection(sock);
2704
2705   /* Remove this thread from the connection threads list */
2706   pthread_mutex_lock (&connection_threads_lock);
2707   connection_threads_num--;
2708   if (connection_threads_num <= 0)
2709     pthread_cond_broadcast(&connection_threads_done);
2710   pthread_mutex_unlock (&connection_threads_lock);
2711
2712   return (NULL);
2713 } /* }}} void *connection_thread_main */
2714
2715 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2716 {
2717   int fd;
2718   struct sockaddr_un sa;
2719   listen_socket_t *temp;
2720   int status;
2721   const char *path;
2722   char *path_copy, *dir;
2723
2724   path = sock->addr;
2725   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2726     path += strlen("unix:");
2727
2728   /* dirname may modify its argument */
2729   path_copy = strdup(path);
2730   if (path_copy == NULL)
2731   {
2732     fprintf(stderr, "rrdcached: strdup(): %s\n",
2733         rrd_strerror(errno));
2734     return (-1);
2735   }
2736
2737   dir = dirname(path_copy);
2738   if (rrd_mkdir_p(dir, 0777) != 0)
2739   {
2740     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2741         dir, rrd_strerror(errno));
2742     return (-1);
2743   }
2744
2745   free(path_copy);
2746
2747   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2748       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2749   if (temp == NULL)
2750   {
2751     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2752     return (-1);
2753   }
2754   listen_fds = temp;
2755   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2756
2757   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2758   if (fd < 0)
2759   {
2760     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2761              rrd_strerror(errno));
2762     return (-1);
2763   }
2764
2765   memset (&sa, 0, sizeof (sa));
2766   sa.sun_family = AF_UNIX;
2767   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2768
2769   /* if we've gotten this far, we own the pid file.  any daemon started
2770    * with the same args must not be alive.  therefore, ensure that we can
2771    * create the socket...
2772    */
2773   unlink(path);
2774
2775   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2776   if (status != 0)
2777   {
2778     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2779              path, rrd_strerror(errno));
2780     close (fd);
2781     return (-1);
2782   }
2783
2784   /* tweak the sockets group ownership */
2785   if (sock->socket_group != (gid_t)-1)
2786   {
2787     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2788          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2789     {
2790       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2791     }
2792   }
2793
2794   if (sock->socket_permissions != (mode_t)-1)
2795   {
2796     if (chmod(path, sock->socket_permissions) != 0)
2797       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2798           (unsigned int)sock->socket_permissions, strerror(errno));
2799   }
2800
2801   status = listen (fd, /* backlog = */ 10);
2802   if (status != 0)
2803   {
2804     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2805              path, rrd_strerror(errno));
2806     close (fd);
2807     unlink (path);
2808     return (-1);
2809   }
2810
2811   listen_fds[listen_fds_num].fd = fd;
2812   listen_fds[listen_fds_num].family = PF_UNIX;
2813   strncpy(listen_fds[listen_fds_num].addr, path,
2814           sizeof (listen_fds[listen_fds_num].addr) - 1);
2815   listen_fds_num++;
2816
2817   return (0);
2818 } /* }}} int open_listen_socket_unix */
2819
2820 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2821 {
2822   struct addrinfo ai_hints;
2823   struct addrinfo *ai_res;
2824   struct addrinfo *ai_ptr;
2825   char addr_copy[NI_MAXHOST];
2826   char *addr;
2827   char *port;
2828   int status;
2829
2830   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2831   addr_copy[sizeof (addr_copy) - 1] = 0;
2832   addr = addr_copy;
2833
2834   memset (&ai_hints, 0, sizeof (ai_hints));
2835   ai_hints.ai_flags = 0;
2836 #ifdef AI_ADDRCONFIG
2837   ai_hints.ai_flags |= AI_ADDRCONFIG;
2838 #endif
2839   ai_hints.ai_family = AF_UNSPEC;
2840   ai_hints.ai_socktype = SOCK_STREAM;
2841
2842   port = NULL;
2843   if (*addr == '[') /* IPv6+port format */
2844   {
2845     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2846     addr++;
2847
2848     port = strchr (addr, ']');
2849     if (port == NULL)
2850     {
2851       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2852       return (-1);
2853     }
2854     *port = 0;
2855     port++;
2856
2857     if (*port == ':')
2858       port++;
2859     else if (*port == 0)
2860       port = NULL;
2861     else
2862     {
2863       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2864       return (-1);
2865     }
2866   } /* if (*addr == '[') */
2867   else
2868   {
2869     port = rindex(addr, ':');
2870     if (port != NULL)
2871     {
2872       *port = 0;
2873       port++;
2874     }
2875   }
2876   ai_res = NULL;
2877   status = getaddrinfo (addr,
2878                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2879                         &ai_hints, &ai_res);
2880   if (status != 0)
2881   {
2882     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2883              addr, gai_strerror (status));
2884     return (-1);
2885   }
2886
2887   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2888   {
2889     int fd;
2890     listen_socket_t *temp;
2891     int one = 1;
2892
2893     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2894         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2895     if (temp == NULL)
2896     {
2897       fprintf (stderr,
2898                "rrdcached: open_listen_socket_network: realloc failed.\n");
2899       continue;
2900     }
2901     listen_fds = temp;
2902     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2903
2904     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2905     if (fd < 0)
2906     {
2907       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2908                rrd_strerror(errno));
2909       continue;
2910     }
2911
2912     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2913
2914     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2915     if (status != 0)
2916     {
2917       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2918                sock->addr, rrd_strerror(errno));
2919       close (fd);
2920       continue;
2921     }
2922
2923     status = listen (fd, /* backlog = */ 10);
2924     if (status != 0)
2925     {
2926       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2927                sock->addr, rrd_strerror(errno));
2928       close (fd);
2929       freeaddrinfo(ai_res);
2930       return (-1);
2931     }
2932
2933     listen_fds[listen_fds_num].fd = fd;
2934     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2935     listen_fds_num++;
2936   } /* for (ai_ptr) */
2937
2938   freeaddrinfo(ai_res);
2939   return (0);
2940 } /* }}} static int open_listen_socket_network */
2941
2942 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2943 {
2944   assert(sock != NULL);
2945   assert(sock->addr != NULL);
2946
2947   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2948       || sock->addr[0] == '/')
2949     return (open_listen_socket_unix(sock));
2950   else
2951     return (open_listen_socket_network(sock));
2952 } /* }}} int open_listen_socket */
2953
2954 static int close_listen_sockets (void) /* {{{ */
2955 {
2956   size_t i;
2957
2958   for (i = 0; i < listen_fds_num; i++)
2959   {
2960     close (listen_fds[i].fd);
2961
2962     if (listen_fds[i].family == PF_UNIX)
2963       unlink(listen_fds[i].addr);
2964   }
2965
2966   free (listen_fds);
2967   listen_fds = NULL;
2968   listen_fds_num = 0;
2969
2970   return (0);
2971 } /* }}} int close_listen_sockets */
2972
2973 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2974 {
2975   struct pollfd *pollfds;
2976   int pollfds_num;
2977   int status;
2978   int i;
2979
2980   if (listen_fds_num < 1)
2981   {
2982     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2983     return (NULL);
2984   }
2985
2986   pollfds_num = listen_fds_num;
2987   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2988   if (pollfds == NULL)
2989   {
2990     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2991     return (NULL);
2992   }
2993   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2994
2995   RRDD_LOG(LOG_INFO, "listening for connections");
2996
2997   while (state == RUNNING)
2998   {
2999     for (i = 0; i < pollfds_num; i++)
3000     {
3001       pollfds[i].fd = listen_fds[i].fd;
3002       pollfds[i].events = POLLIN | POLLPRI;
3003       pollfds[i].revents = 0;
3004     }
3005
3006     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3007     if (state != RUNNING)
3008       break;
3009     else if (status == 0) /* timeout */
3010       continue;
3011     else if (status < 0) /* error */
3012     {
3013       status = errno;
3014       if (status != EINTR)
3015       {
3016         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3017       }
3018       continue;
3019     }
3020
3021     for (i = 0; i < pollfds_num; i++)
3022     {
3023       listen_socket_t *client_sock;
3024       struct sockaddr_storage client_sa;
3025       socklen_t client_sa_size;
3026       pthread_t tid;
3027       pthread_attr_t attr;
3028
3029       if (pollfds[i].revents == 0)
3030         continue;
3031
3032       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3033       {
3034         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3035             "poll(2) returned something unexpected for listen FD #%i.",
3036             pollfds[i].fd);
3037         continue;
3038       }
3039
3040       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3041       if (client_sock == NULL)
3042       {
3043         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3044         continue;
3045       }
3046       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3047
3048       client_sa_size = sizeof (client_sa);
3049       client_sock->fd = accept (pollfds[i].fd,
3050           (struct sockaddr *) &client_sa, &client_sa_size);
3051       if (client_sock->fd < 0)
3052       {
3053         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3054         free(client_sock);
3055         continue;
3056       }
3057
3058       pthread_attr_init (&attr);
3059       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3060
3061       status = pthread_create (&tid, &attr, connection_thread_main,
3062                                client_sock);
3063       if (status != 0)
3064       {
3065         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3066         close_connection(client_sock);
3067         continue;
3068       }
3069     } /* for (pollfds_num) */
3070   } /* while (state == RUNNING) */
3071
3072   RRDD_LOG(LOG_INFO, "starting shutdown");
3073
3074   close_listen_sockets ();
3075
3076   pthread_mutex_lock (&connection_threads_lock);
3077   while (connection_threads_num > 0)
3078     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3079   pthread_mutex_unlock (&connection_threads_lock);
3080
3081   free(pollfds);
3082
3083   return (NULL);
3084 } /* }}} void *listen_thread_main */
3085
3086 static int daemonize (void) /* {{{ */
3087 {
3088   int pid_fd;
3089   char *base_dir;
3090
3091   daemon_uid = geteuid();
3092
3093   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3094   if (pid_fd < 0)
3095     pid_fd = check_pidfile();
3096   if (pid_fd < 0)
3097     return pid_fd;
3098
3099   /* open all the listen sockets */
3100   if (config_listen_address_list_len > 0)
3101   {
3102     for (size_t i = 0; i < config_listen_address_list_len; i++)
3103       open_listen_socket (config_listen_address_list[i]);
3104
3105     rrd_free_ptrs((void ***) &config_listen_address_list,
3106                   &config_listen_address_list_len);
3107   }
3108   else
3109   {
3110     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3111         sizeof(default_socket.addr) - 1);
3112     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3113     open_listen_socket (&default_socket);
3114   }
3115
3116   if (listen_fds_num < 1)
3117   {
3118     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3119     goto error;
3120   }
3121
3122   if (!stay_foreground)
3123   {
3124     pid_t child;
3125
3126     child = fork ();
3127     if (child < 0)
3128     {
3129       fprintf (stderr, "daemonize: fork(2) failed.\n");
3130       goto error;
3131     }
3132     else if (child > 0)
3133       exit(0);
3134
3135     /* Become session leader */
3136     setsid ();
3137
3138     /* Open the first three file descriptors to /dev/null */
3139     close (2);
3140     close (1);
3141     close (0);
3142
3143     open ("/dev/null", O_RDWR);
3144     if (dup(0) == -1 || dup(0) == -1){
3145         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3146     }
3147   } /* if (!stay_foreground) */
3148
3149   /* Change into the /tmp directory. */
3150   base_dir = (config_base_dir != NULL)
3151     ? config_base_dir
3152     : "/tmp";
3153
3154   if (chdir (base_dir) != 0)
3155   {
3156     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3157     goto error;
3158   }
3159
3160   install_signal_handlers();
3161
3162   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3163   RRDD_LOG(LOG_INFO, "starting up");
3164
3165   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3166                                 (GDestroyNotify) free_cache_item);
3167   if (cache_tree == NULL)
3168   {
3169     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3170     goto error;
3171   }
3172
3173   return write_pidfile (pid_fd);
3174
3175 error:
3176   remove_pidfile();
3177   return -1;
3178 } /* }}} int daemonize */
3179
3180 static int cleanup (void) /* {{{ */
3181 {
3182   pthread_cond_broadcast (&flush_cond);
3183   pthread_join (flush_thread, NULL);
3184
3185   pthread_cond_broadcast (&queue_cond);
3186   for (int i = 0; i < config_queue_threads; i++)
3187     pthread_join (queue_threads[i], NULL);
3188
3189   if (config_flush_at_shutdown)
3190   {
3191     assert(cache_queue_head == NULL);
3192     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3193   }
3194
3195   free(queue_threads);
3196   free(config_base_dir);
3197
3198   pthread_mutex_lock(&cache_lock);
3199   g_tree_destroy(cache_tree);
3200
3201   pthread_mutex_lock(&journal_lock);
3202   journal_done();
3203
3204   RRDD_LOG(LOG_INFO, "goodbye");
3205   closelog ();
3206
3207   remove_pidfile ();
3208   free(config_pid_file);
3209
3210   return (0);
3211 } /* }}} int cleanup */
3212
3213 static int read_options (int argc, char **argv) /* {{{ */
3214 {
3215   int option;
3216   int status = 0;
3217
3218   socket_permission_clear (&default_socket);
3219
3220   default_socket.socket_group = (gid_t)-1;
3221   default_socket.socket_permissions = (mode_t)-1;
3222
3223   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3224   {
3225     switch (option)
3226     {
3227       case 'O':
3228         opt_no_overwrite = 1;
3229         break;
3230
3231       case 'g':
3232         stay_foreground=1;
3233         break;
3234
3235       case 'l':
3236       {
3237         listen_socket_t *new;
3238
3239         new = malloc(sizeof(listen_socket_t));
3240         if (new == NULL)
3241         {
3242           fprintf(stderr, "read_options: malloc failed.\n");
3243           return(2);
3244         }
3245         memset(new, 0, sizeof(listen_socket_t));
3246
3247         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3248
3249         /* Add permissions to the socket {{{ */
3250         if (default_socket.permissions != 0)
3251         {
3252           socket_permission_copy (new, &default_socket);
3253         }
3254         else /* if (default_socket.permissions == 0) */
3255         {
3256           /* Add permission for ALL commands to the socket. */
3257           size_t i;
3258           for (i = 0; i < list_of_commands_len; i++)
3259           {
3260             status = socket_permission_add (new, list_of_commands[i].cmd);
3261             if (status != 0)
3262             {
3263               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3264                   "socket failed. This should never happen, ever! Sorry.\n",
3265                   list_of_commands[i].cmd);
3266               status = 4;
3267             }
3268           }
3269         }
3270         /* }}} Done adding permissions. */
3271
3272         new->socket_group = default_socket.socket_group;
3273         new->socket_permissions = default_socket.socket_permissions;
3274
3275         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3276                          &config_listen_address_list_len, new))
3277         {
3278           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3279           return (2);
3280         }
3281       }
3282       break;
3283
3284       /* set socket group permissions */
3285       case 's':
3286       {
3287         gid_t group_gid;
3288         struct group *grp;
3289
3290         group_gid = strtoul(optarg, NULL, 10);
3291         if (errno != EINVAL && group_gid>0)
3292         {
3293           /* we were passed a number */
3294           grp = getgrgid(group_gid);
3295         }
3296         else
3297         {
3298           grp = getgrnam(optarg);
3299         }
3300
3301         if (grp)
3302         {
3303           default_socket.socket_group = grp->gr_gid;
3304         }
3305         else
3306         {
3307           /* no idea what the user wanted... */
3308           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3309           return (5);
3310         }
3311       }
3312       break;
3313
3314       /* set socket file permissions */
3315       case 'm':
3316       {
3317         long  tmp;
3318         char *endptr = NULL;
3319
3320         tmp = strtol (optarg, &endptr, 8);
3321         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3322             || (tmp > 07777) || (tmp < 0)) {
3323           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3324               optarg);
3325           return (5);
3326         }
3327
3328         default_socket.socket_permissions = (mode_t)tmp;
3329       }
3330       break;
3331
3332       case 'P':
3333       {
3334         char *optcopy;
3335         char *saveptr;
3336         char *dummy;
3337         char *ptr;
3338
3339         socket_permission_clear (&default_socket);
3340
3341         optcopy = strdup (optarg);
3342         dummy = optcopy;
3343         saveptr = NULL;
3344         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3345         {
3346           dummy = NULL;
3347           status = socket_permission_add (&default_socket, ptr);
3348           if (status != 0)
3349           {
3350             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3351                 "socket failed. Most likely, this permission doesn't "
3352                 "exist. Check your command line.\n", ptr);
3353             status = 4;
3354           }
3355         }
3356
3357         free (optcopy);
3358       }
3359       break;
3360
3361       case 'f':
3362       {
3363         int temp;
3364
3365         temp = atoi (optarg);
3366         if (temp > 0)
3367           config_flush_interval = temp;
3368         else
3369         {
3370           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3371           status = 3;
3372         }
3373       }
3374       break;
3375
3376       case 'w':
3377       {
3378         int temp;
3379
3380         temp = atoi (optarg);
3381         if (temp > 0)
3382           config_write_interval = temp;
3383         else
3384         {
3385           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3386           status = 2;
3387         }
3388       }
3389       break;
3390
3391       case 'z':
3392       {
3393         int temp;
3394
3395         temp = atoi(optarg);
3396         if (temp > 0)
3397           config_write_jitter = temp;
3398         else
3399         {
3400           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3401           status = 2;
3402         }
3403
3404         break;
3405       }
3406
3407       case 't':
3408       {
3409         int threads;
3410         threads = atoi(optarg);
3411         if (threads >= 1)
3412           config_queue_threads = threads;
3413         else
3414         {
3415           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3416           return 1;
3417         }
3418       }
3419       break;
3420
3421       case 'B':
3422         config_write_base_only = 1;
3423         break;
3424
3425       case 'b':
3426       {
3427         size_t len;
3428         char base_realpath[PATH_MAX];
3429
3430         if (config_base_dir != NULL)
3431           free (config_base_dir);
3432         config_base_dir = strdup (optarg);
3433         if (config_base_dir == NULL)
3434         {
3435           fprintf (stderr, "read_options: strdup failed.\n");
3436           return (3);
3437         }
3438
3439         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3440         {
3441           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3442               config_base_dir, rrd_strerror (errno));
3443           return (3);
3444         }
3445
3446         /* make sure that the base directory is not resolved via
3447          * symbolic links.  this makes some performance-enhancing
3448          * assumptions possible (we don't have to resolve paths
3449          * that start with a "/")
3450          */
3451         if (realpath(config_base_dir, base_realpath) == NULL)
3452         {
3453           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3454               "%s\n", config_base_dir, rrd_strerror(errno));
3455           return 5;
3456         }
3457
3458         len = strlen (config_base_dir);
3459         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3460         {
3461           config_base_dir[len - 1] = 0;
3462           len--;
3463         }
3464
3465         if (len < 1)
3466         {
3467           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3468           return (4);
3469         }
3470
3471         _config_base_dir_len = len;
3472
3473         len = strlen (base_realpath);
3474         while ((len > 0) && (base_realpath[len - 1] == '/'))
3475         {
3476           base_realpath[len - 1] = '\0';
3477           len--;
3478         }
3479
3480         if (strncmp(config_base_dir,
3481                          base_realpath, sizeof(base_realpath)) != 0)
3482         {
3483           fprintf(stderr,
3484                   "Base directory (-b) resolved via file system links!\n"
3485                   "Please consult rrdcached '-b' documentation!\n"
3486                   "Consider specifying the real directory (%s)\n",
3487                   base_realpath);
3488           return 5;
3489         }
3490       }
3491       break;
3492
3493       case 'p':
3494       {
3495         if (config_pid_file != NULL)
3496           free (config_pid_file);
3497         config_pid_file = strdup (optarg);
3498         if (config_pid_file == NULL)
3499         {
3500           fprintf (stderr, "read_options: strdup failed.\n");
3501           return (3);
3502         }
3503       }
3504       break;
3505
3506       case 'F':
3507         config_flush_at_shutdown = 1;
3508         break;
3509
3510       case 'j':
3511       {
3512         char journal_dir_actual[PATH_MAX];
3513         const char *dir;
3514         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3515
3516         status = rrd_mkdir_p(dir, 0777);
3517         if (status != 0)
3518         {
3519           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3520               dir, rrd_strerror(errno));
3521           return 6;
3522         }
3523
3524         if (access(dir, R_OK|W_OK|X_OK) != 0)
3525         {
3526           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3527                   errno ? rrd_strerror(errno) : "");
3528           return 6;
3529         }
3530       }
3531       break;
3532
3533       case 'a':
3534       {
3535         int temp = atoi(optarg);
3536         if (temp > 0)
3537           config_alloc_chunk = temp;
3538         else
3539         {
3540           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3541           return 10;
3542         }
3543       }
3544       break;
3545
3546       case 'h':
3547       case '?':
3548         printf ("RRDCacheD %s\n"
3549             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3550             "\n"
3551             "Usage: rrdcached [options]\n"
3552             "\n"
3553             "Valid options are:\n"
3554             "  -l <address>  Socket address to listen to.\n"
3555             "  -P <perms>    Sets the permissions to assign to all following "
3556                             "sockets\n"
3557             "  -w <seconds>  Interval in which to write data.\n"
3558             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3559             "  -t <threads>  Number of write threads.\n"
3560             "  -f <seconds>  Interval in which to flush dead data.\n"
3561             "  -p <file>     Location of the PID-file.\n"
3562             "  -b <dir>      Base directory to change to.\n"
3563             "  -B            Restrict file access to paths within -b <dir>\n"
3564             "  -g            Do not fork and run in the foreground.\n"
3565             "  -j <dir>      Directory in which to create the journal files.\n"
3566             "  -F            Always flush all updates at shutdown\n"
3567             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3568             "                (the socket will also have read/write permissions "
3569                             "for that group)\n"
3570             "  -m <mode>     File permissions (octal) of all following UNIX "
3571                             "sockets\n"
3572             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3573             "  -O            Do not allow CREATE commands to overwrite existing\n"
3574             "                files, even if asked to.\n"
3575             "\n"
3576             "For more information and a detailed description of all options "
3577             "please refer\n"
3578             "to the rrdcached(1) manual page.\n",
3579             VERSION);
3580         if (option == 'h')
3581           status = -1;
3582         else
3583           status = 1;
3584         break;
3585     } /* switch (option) */
3586   } /* while (getopt) */
3587
3588   /* advise the user when values are not sane */
3589   if (config_flush_interval < 2 * config_write_interval)
3590     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3591             " 2x write interval (-w) !\n");
3592   if (config_write_jitter > config_write_interval)
3593     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3594             " write interval (-w) !\n");
3595
3596   if (config_write_base_only && config_base_dir == NULL)
3597     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3598             "  Consult the rrdcached documentation\n");
3599
3600   if (journal_dir == NULL)
3601     config_flush_at_shutdown = 1;
3602
3603   return (status);
3604 } /* }}} int read_options */
3605
3606 int main (int argc, char **argv)
3607 {
3608   int status;
3609
3610   status = read_options (argc, argv);
3611   if (status != 0)
3612   {
3613     if (status < 0)
3614       status = 0;
3615     return (status);
3616   }
3617
3618   status = daemonize ();
3619   if (status != 0)
3620   {
3621     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3622     return (1);
3623   }
3624
3625   journal_init();
3626
3627   /* start the queue threads */
3628   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3629   if (queue_threads == NULL)
3630   {
3631     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3632     cleanup();
3633     return (1);
3634   }
3635   for (int i = 0; i < config_queue_threads; i++)
3636   {
3637     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3638     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3639     if (status != 0)
3640     {
3641       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3642       cleanup();
3643       return (1);
3644     }
3645   }
3646
3647   /* start the flush thread */
3648   memset(&flush_thread, 0, sizeof(flush_thread));
3649   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3650   if (status != 0)
3651   {
3652     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3653     cleanup();
3654     return (1);
3655   }
3656
3657   listen_thread_main (NULL);
3658   cleanup ();
3659
3660   return (0);
3661 } /* int main */
3662
3663 /*
3664  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3665  */