This moves selection of the initial RRA row into the rrd_open.c API
[rrdtool.git] / src / rrd_open.c
1 /*****************************************************************************
2  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
3  *****************************************************************************
4  * rrd_open.c  Open an RRD File
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
8
9 #include "rrd_tool.h"
10 #include "unused.h"
11 #define MEMBLK 8192
12
13 /* DEBUG 2 prints information obtained via mincore(2) */
14 #define DEBUG 1
15 /* do not calculate exact madvise hints but assume 1 page for headers and
16  * set DONTNEED for the rest, which is assumed to be data */
17 /* Avoid calling madvise on areas that were already hinted. May be benefical if
18  * your syscalls are very slow */
19
20 #ifdef HAVE_MMAP
21 /* the cast to void* is there to avoid this warning seen on ia64 with certain
22    versions of gcc: 'cast increases required alignment of target type'
23 */
24 #define __rrd_read(dst, dst_t, cnt) { \
25         size_t wanted = sizeof(dst_t)*(cnt); \
26         if (offset + wanted > rrd_file->file_len) { \
27                 rrd_set_error("reached EOF while loading header " #dst); \
28                 goto out_nullify_head; \
29         } \
30         (dst) = (dst_t*)(void*) (data + offset); \
31         offset += wanted; \
32     }
33 #else
34 #define __rrd_read(dst, dst_t, cnt) { \
35         size_t wanted = sizeof(dst_t)*(cnt); \
36         size_t got; \
37         if ((dst = malloc(wanted)) == NULL) { \
38                 rrd_set_error(#dst " malloc"); \
39                 goto out_nullify_head; \
40         } \
41         got = read (rrd_file->fd, dst, wanted); \
42         if (got != wanted) { \
43                 rrd_set_error("short read while reading header " #dst); \
44                 goto out_nullify_head; \
45         } \
46         offset += got; \
47     }
48 #endif
49
50 /* get the address of the start of this page */
51 #if defined USE_MADVISE || defined HAVE_POSIX_FADVISE
52 #ifndef PAGE_START
53 #define PAGE_START(addr) ((addr)&(~(_page_size-1)))
54 #endif
55 #endif
56
57 long int  rra_random_row(
58     rra_def_t *);
59
60
61 /* Open a database file, return its header and an open filehandle,
62  * positioned to the first cdp in the first rra.
63  * In the error path of rrd_open, only rrd_free(&rrd) has to be called
64  * before returning an error. Do not call rrd_close upon failure of rrd_open.
65  */
66
67 rrd_file_t *rrd_open(
68     const char *const file_name,
69     rrd_t *rrd,
70     unsigned rdwr)
71 {
72     int       flags = 0;
73     mode_t    mode = S_IRUSR;
74     int       version;
75
76 #ifdef HAVE_MMAP
77     ssize_t   _page_size = sysconf(_SC_PAGESIZE);
78     int       mm_prot = PROT_READ, mm_flags = 0;
79     char     *data = MAP_FAILED;
80 #endif
81     off_t     offset = 0;
82     struct stat statb;
83     rrd_file_t *rrd_file = NULL;
84     off_t     newfile_size = 0;
85
86     if ((rdwr & RRD_CREAT) && (rdwr & RRD_CREAT_SETSIZE)) {
87         /* yes bad inline signaling alert, we are using the
88            floatcookie to pass the size in ... only used in resize */
89         newfile_size = (off_t) rrd->stat_head->float_cookie;
90         free(rrd->stat_head);
91     }
92     if(!(rdwr & RRD_CREAT))
93         rrd_init(rrd);
94     rrd_file = malloc(sizeof(rrd_file_t));
95     if (rrd_file == NULL) {
96         rrd_set_error("allocating rrd_file descriptor for '%s'", file_name);
97         return NULL;
98     }
99     memset(rrd_file, 0, sizeof(rrd_file_t));
100
101 #ifdef DEBUG
102     if ((rdwr & (RRD_READONLY | RRD_READWRITE)) ==
103         (RRD_READONLY | RRD_READWRITE)) {
104         /* Both READONLY and READWRITE were given, which is invalid.  */
105         rrd_set_error("in read/write request mask");
106         exit(-1);
107     }
108 #endif
109     if (rdwr & RRD_READONLY) {
110         flags |= O_RDONLY;
111 #ifdef HAVE_MMAP
112         mm_flags = MAP_PRIVATE;
113 # ifdef MAP_NORESERVE
114         mm_flags |= MAP_NORESERVE;  /* readonly, so no swap backing needed */
115 # endif
116 #endif
117     } else {
118         if (rdwr & RRD_READWRITE) {
119             mode |= S_IWUSR;
120             flags |= O_RDWR;
121 #ifdef HAVE_MMAP
122             mm_flags = MAP_SHARED;
123             mm_prot |= PROT_WRITE;
124 #endif
125         }
126         if (rdwr & RRD_CREAT) {
127             flags |= (O_CREAT | O_TRUNC);
128         }
129     }
130     if (rdwr & RRD_READAHEAD) {
131 #ifdef MAP_POPULATE
132         mm_flags |= MAP_POPULATE;   /* populate ptes and data */
133 #endif
134 #if defined MAP_NONBLOCK
135         mm_flags |= MAP_NONBLOCK;   /* just populate ptes */
136 #endif
137     }
138 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
139     flags |= O_BINARY;
140 #endif
141
142     if ((rrd_file->fd = open(file_name, flags, mode)) < 0) {
143         rrd_set_error("opening '%s': %s", file_name, rrd_strerror(errno));
144         goto out_free;
145     }
146
147     /* Better try to avoid seeks as much as possible. stat may be heavy but
148      * many concurrent seeks are even worse.  */
149     if (newfile_size == 0 && ((fstat(rrd_file->fd, &statb)) < 0)) {
150         rrd_set_error("fstat '%s': %s", file_name, rrd_strerror(errno));
151         goto out_close;
152     }
153     if (newfile_size == 0) {
154         rrd_file->file_len = statb.st_size;
155     } else {
156         rrd_file->file_len = newfile_size;
157         lseek(rrd_file->fd, newfile_size - 1, SEEK_SET);
158         write(rrd_file->fd, "\0", 1);   /* poke */
159         lseek(rrd_file->fd, 0, SEEK_SET);
160     }
161 #ifdef HAVE_POSIX_FADVISE
162     /* In general we need no read-ahead when dealing with rrd_files.
163        When we stop reading, it is highly unlikely that we start up again.
164        In this manner we actually save time and diskaccess (and buffer cache).
165        Thanks to Dave Plonka for the Idea of using POSIX_FADV_RANDOM here. */
166     posix_fadvise(rrd_file->fd, 0, 0, POSIX_FADV_RANDOM);
167 #endif
168
169 /*
170         if (rdwr & RRD_READWRITE)
171         {
172            if (setvbuf((rrd_file->fd),NULL,_IONBF,2)) {
173                   rrd_set_error("failed to disable the stream buffer\n");
174                   return (-1);
175            }
176         }
177 */
178
179 #ifdef HAVE_MMAP
180     if(rrd_file->file_len == 0 && (rdwr & RRD_CREAT))
181     {
182         rrd_file->file_start = NULL;
183         goto out_done;
184     }
185     data = mmap(0, rrd_file->file_len, mm_prot, mm_flags,
186                 rrd_file->fd, offset);
187
188     /* lets see if the first read worked */
189     if (data == MAP_FAILED) {
190         rrd_set_error("mmaping file '%s': %s", file_name,
191                       rrd_strerror(errno));
192         goto out_close;
193     }
194     rrd_file->file_start = data;
195     if (rdwr & RRD_CREAT) {
196         memset(data, DNAN, newfile_size - 1);
197         goto out_done;
198     }
199 #endif
200     if (rdwr & RRD_CREAT)
201         goto out_done;
202 #ifdef USE_MADVISE
203     if (rdwr & RRD_COPY) {
204         /* We will read everything in a moment (copying) */
205         madvise(data, rrd_file->file_len, MADV_WILLNEED | MADV_SEQUENTIAL);
206     } else {
207         /* We do not need to read anything in for the moment */
208         madvise(data, rrd_file->file_len, MADV_RANDOM);
209         /* the stat_head will be needed soonish, so hint accordingly */
210         madvise(data, sizeof(stat_head_t), MADV_WILLNEED | MADV_RANDOM);
211     }
212 #endif
213
214     __rrd_read(rrd->stat_head, stat_head_t,
215                1);
216
217     /* lets do some test if we are on track ... */
218     if (memcmp(rrd->stat_head->cookie, RRD_COOKIE, sizeof(RRD_COOKIE)) != 0) {
219         rrd_set_error("'%s' is not an RRD file", file_name);
220         goto out_nullify_head;
221     }
222
223     if (rrd->stat_head->float_cookie != FLOAT_COOKIE) {
224         rrd_set_error("This RRD was created on another architecture");
225         goto out_nullify_head;
226     }
227
228     version = atoi(rrd->stat_head->version);
229
230     if (version > atoi(RRD_VERSION)) {
231         rrd_set_error("can't handle RRD file version %s",
232                       rrd->stat_head->version);
233         goto out_nullify_head;
234     }
235 #if defined USE_MADVISE
236     /* the ds_def will be needed soonish, so hint accordingly */
237     madvise(data + PAGE_START(offset),
238             sizeof(ds_def_t) * rrd->stat_head->ds_cnt, MADV_WILLNEED);
239 #endif
240     __rrd_read(rrd->ds_def, ds_def_t,
241                rrd->stat_head->ds_cnt);
242
243 #if defined USE_MADVISE
244     /* the rra_def will be needed soonish, so hint accordingly */
245     madvise(data + PAGE_START(offset),
246             sizeof(rra_def_t) * rrd->stat_head->rra_cnt, MADV_WILLNEED);
247 #endif
248     __rrd_read(rrd->rra_def, rra_def_t,
249                rrd->stat_head->rra_cnt);
250
251     /* handle different format for the live_head */
252     if (version < 3) {
253         rrd->live_head = (live_head_t *) malloc(sizeof(live_head_t));
254         if (rrd->live_head == NULL) {
255             rrd_set_error("live_head_t malloc");
256             goto out_close;
257         }
258 #if defined USE_MADVISE
259         /* the live_head will be needed soonish, so hint accordingly */
260         madvise(data + PAGE_START(offset), sizeof(time_t), MADV_WILLNEED);
261 #endif
262         __rrd_read(rrd->legacy_last_up, time_t,
263                    1);
264
265         rrd->live_head->last_up = *rrd->legacy_last_up;
266         rrd->live_head->last_up_usec = 0;
267     } else {
268 #if defined USE_MADVISE
269         /* the live_head will be needed soonish, so hint accordingly */
270         madvise(data + PAGE_START(offset),
271                 sizeof(live_head_t), MADV_WILLNEED);
272 #endif
273         __rrd_read(rrd->live_head, live_head_t,
274                    1);
275     }
276     __rrd_read(rrd->pdp_prep, pdp_prep_t,
277                rrd->stat_head->ds_cnt);
278     __rrd_read(rrd->cdp_prep, cdp_prep_t,
279                rrd->stat_head->rra_cnt * rrd->stat_head->ds_cnt);
280     __rrd_read(rrd->rra_ptr, rra_ptr_t,
281                rrd->stat_head->rra_cnt);
282
283     rrd_file->header_len = offset;
284     rrd_file->pos = offset;
285
286     {
287       unsigned long row_cnt = 0;
288       unsigned long i;
289
290       for (i=0; i<rrd->stat_head->rra_cnt; i++)
291         row_cnt += rrd->rra_def[i].row_cnt;
292
293       off_t correct_len = rrd_file->header_len +
294         sizeof(rrd_value_t) * row_cnt * rrd->stat_head->ds_cnt;
295
296       if (correct_len > rrd_file->file_len)
297       {
298         rrd_set_error("'%s' is too small (should be %ld bytes)",
299                       file_name, (long long) correct_len);
300         goto out_nullify_head;
301       }
302     }
303
304   out_done:
305     return (rrd_file);
306   out_nullify_head:
307     rrd->stat_head = NULL;
308   out_close:
309 #ifdef HAVE_MMAP
310     if (data != MAP_FAILED)
311       munmap(data, rrd_file->file_len);
312 #endif
313     close(rrd_file->fd);
314   out_free:
315     free(rrd_file);
316     return NULL;
317 }
318
319
320 #if defined DEBUG && DEBUG > 1
321 /* Print list of in-core pages of a the current rrd_file.  */
322 static
323 void mincore_print(
324     rrd_file_t *rrd_file,
325     char *mark)
326 {
327 #ifdef HAVE_MMAP
328     /* pretty print blocks in core */
329     off_t     off;
330     unsigned char *vec;
331     ssize_t   _page_size = sysconf(_SC_PAGESIZE);
332
333     off = rrd_file->file_len +
334         ((rrd_file->file_len + _page_size - 1) / _page_size);
335     vec = malloc(off);
336     if (vec != NULL) {
337         memset(vec, 0, off);
338         if (mincore(rrd_file->file_start, rrd_file->file_len, vec) == 0) {
339             int       prev;
340             unsigned  is_in = 0, was_in = 0;
341
342             for (off = 0, prev = 0; off < rrd_file->file_len; ++off) {
343                 is_in = vec[off] & 1;   /* if lsb set then is core resident */
344                 if (off == 0)
345                     was_in = is_in;
346                 if (was_in != is_in) {
347                     fprintf(stderr, "%s: %sin core: %p len %ld\n", mark,
348                             was_in ? "" : "not ", vec + prev, off - prev);
349                     was_in = is_in;
350                     prev = off;
351                 }
352             }
353             fprintf(stderr,
354                     "%s: %sin core: %p len %ld\n", mark,
355                     was_in ? "" : "not ", vec + prev, off - prev);
356         } else
357             fprintf(stderr, "mincore: %s", rrd_strerror(errno));
358     }
359 #else
360     fprintf(stderr, "sorry mincore only works with mmap");
361 #endif
362 }
363 #endif                          /* defined DEBUG && DEBUG > 1 */
364
365
366 /* drop cache except for the header and the active pages */
367 void rrd_dontneed(
368     rrd_file_t *rrd_file,
369     rrd_t *rrd)
370 {
371 #if defined USE_MADVISE || defined HAVE_POSIX_FADVISE
372     off_t dontneed_start;
373     off_t rra_start;
374     off_t active_block;
375     unsigned long i;
376     ssize_t   _page_size = sysconf(_SC_PAGESIZE);
377
378     if (rrd_file == NULL) {
379 #if defined DEBUG && DEBUG
380             fprintf (stderr, "rrd_dontneed: Argument 'rrd_file' is NULL.\n");
381 #endif
382             return;
383     }
384
385 #if defined DEBUG && DEBUG > 1
386     mincore_print(rrd_file, "before");
387 #endif
388
389     /* ignoring errors from RRDs that are smaller then the file_len+rounding */
390     rra_start = rrd_file->header_len;
391     dontneed_start = PAGE_START(rra_start) + _page_size;
392     for (i = 0; i < rrd->stat_head->rra_cnt; ++i) {
393         active_block =
394             PAGE_START(rra_start
395                        + rrd->rra_ptr[i].cur_row
396                        * rrd->stat_head->ds_cnt * sizeof(rrd_value_t));
397         if (active_block > dontneed_start) {
398 #ifdef USE_MADVISE
399             madvise(rrd_file->file_start + dontneed_start,
400                     active_block - dontneed_start - 1, MADV_DONTNEED);
401 #endif
402 /* in linux at least only fadvise DONTNEED seems to purge pages from cache */
403 #ifdef HAVE_POSIX_FADVISE
404             posix_fadvise(rrd_file->fd, dontneed_start,
405                           active_block - dontneed_start - 1,
406                           POSIX_FADV_DONTNEED);
407 #endif
408         }
409         dontneed_start = active_block;
410         /* do not release 'hot' block if update for this RAA will occur
411          * within 10 minutes */
412         if (rrd->stat_head->pdp_step * rrd->rra_def[i].pdp_cnt -
413             rrd->live_head->last_up % (rrd->stat_head->pdp_step *
414                                        rrd->rra_def[i].pdp_cnt) < 10 * 60) {
415             dontneed_start += _page_size;
416         }
417         rra_start +=
418             rrd->rra_def[i].row_cnt * rrd->stat_head->ds_cnt *
419             sizeof(rrd_value_t);
420     }
421
422     if (dontneed_start < rrd_file->file_len) {
423 #ifdef USE_MADVISE
424             madvise(rrd_file->file_start + dontneed_start,
425                     rrd_file->file_len - dontneed_start, MADV_DONTNEED);
426 #endif
427 #ifdef HAVE_POSIX_FADVISE
428             posix_fadvise(rrd_file->fd, dontneed_start,
429                           rrd_file->file_len - dontneed_start,
430                           POSIX_FADV_DONTNEED);
431 #endif
432     }
433
434 #if defined DEBUG && DEBUG > 1
435     mincore_print(rrd_file, "after");
436 #endif
437 #endif                          /* without madvise and posix_fadvise ist does not make much sense todo anything */
438 }
439
440
441
442
443
444 int rrd_close(
445     rrd_file_t *rrd_file)
446 {
447     int       ret;
448
449 #ifdef HAVE_MMAP
450     ret = msync(rrd_file->file_start, rrd_file->file_len, MS_ASYNC);
451     if (ret != 0)
452         rrd_set_error("msync rrd_file: %s", rrd_strerror(errno));
453     ret = munmap(rrd_file->file_start, rrd_file->file_len);
454     if (ret != 0)
455         rrd_set_error("munmap rrd_file: %s", rrd_strerror(errno));
456 #endif
457     ret = close(rrd_file->fd);
458     if (ret != 0)
459         rrd_set_error("closing file: %s", rrd_strerror(errno));
460     free(rrd_file);
461     rrd_file = NULL;
462     return ret;
463 }
464
465
466 /* Set position of rrd_file.  */
467
468 off_t rrd_seek(
469     rrd_file_t *rrd_file,
470     off_t off,
471     int whence)
472 {
473     off_t     ret = 0;
474
475 #ifdef HAVE_MMAP
476     if (whence == SEEK_SET)
477         rrd_file->pos = off;
478     else if (whence == SEEK_CUR)
479         rrd_file->pos += off;
480     else if (whence == SEEK_END)
481         rrd_file->pos = rrd_file->file_len + off;
482 #else
483     ret = lseek(rrd_file->fd, off, whence);
484     if (ret < 0)
485         rrd_set_error("lseek: %s", rrd_strerror(errno));
486     rrd_file->pos = ret;
487 #endif
488     /* mimic fseek, which returns 0 upon success */
489     return ret < 0;     /*XXX: or just ret to mimic lseek */
490 }
491
492
493 /* Get current position in rrd_file.  */
494
495 off_t rrd_tell(
496     rrd_file_t *rrd_file)
497 {
498     return rrd_file->pos;
499 }
500
501
502 /* Read count bytes into buffer buf, starting at rrd_file->pos.
503  * Returns the number of bytes read or <0 on error.  */
504
505 ssize_t rrd_read(
506     rrd_file_t *rrd_file,
507     void *buf,
508     size_t count)
509 {
510 #ifdef HAVE_MMAP
511     size_t    _cnt = count;
512     ssize_t   _surplus;
513
514     if (rrd_file->pos > rrd_file->file_len || _cnt == 0)    /* EOF */
515         return 0;
516     if (buf == NULL)
517         return -1;      /* EINVAL */
518     _surplus = rrd_file->pos + _cnt - rrd_file->file_len;
519     if (_surplus > 0) { /* short read */
520         _cnt -= _surplus;
521     }
522     if (_cnt == 0)
523         return 0;       /* EOF */
524     buf = memcpy(buf, rrd_file->file_start + rrd_file->pos, _cnt);
525
526     rrd_file->pos += _cnt;  /* mimmic read() semantics */
527     return _cnt;
528 #else
529     ssize_t   ret;
530
531     ret = read(rrd_file->fd, buf, count);
532     if (ret > 0)
533         rrd_file->pos += ret;   /* mimmic read() semantics */
534     return ret;
535 #endif
536 }
537
538
539 /* Write count bytes from buffer buf to the current position
540  * rrd_file->pos of rrd_file->fd.
541  * Returns the number of bytes written or <0 on error.  */
542
543 ssize_t rrd_write(
544     rrd_file_t *rrd_file,
545     const void *buf,
546     size_t count)
547 {
548 #ifdef HAVE_MMAP
549     /* These flags are used if creating a new RRD */
550     int       mm_prot = PROT_READ | PROT_WRITE, mm_flags = MAP_SHARED;
551     int old_size = rrd_file->file_len;
552     int new_size = rrd_file->file_len;
553     if (count == 0)
554         return 0;
555     if (buf == NULL)
556         return -1;      /* EINVAL */
557     
558     if((rrd_file->pos + count) > old_size)
559     {
560         new_size = rrd_file->pos + count; 
561         rrd_file->file_len = new_size;
562         lseek(rrd_file->fd, new_size - 1, SEEK_SET);
563         write(rrd_file->fd, "\0", 1);   /* poke */
564         lseek(rrd_file->fd, 0, SEEK_SET);
565         if(rrd_file->file_start == NULL)
566         {
567             rrd_file->file_start = mmap(0, new_size, mm_prot, mm_flags,
568                 rrd_file->fd, 0);
569         }
570         else
571             rrd_file->file_start = mremap(rrd_file->file_start, old_size, new_size, MREMAP_MAYMOVE); 
572
573         if (rrd_file->file_start == MAP_FAILED) {
574             rrd_set_error("m(re)maping file : %s", 
575                       rrd_strerror(errno));
576             return -1;
577         }
578     }
579     memcpy(rrd_file->file_start + rrd_file->pos, buf, count);
580     rrd_file->pos += count;
581     return count;       /* mimmic write() semantics */
582 #else
583     ssize_t   _sz = write(rrd_file->fd, buf, count);
584
585     if (_sz > 0)
586         rrd_file->pos += _sz;
587     return _sz;
588 #endif
589 }
590
591
592 /* flush all data pending to be written to FD.  */
593
594 void rrd_flush(
595     rrd_file_t *rrd_file)
596 {
597     if (fdatasync(rrd_file->fd) != 0) {
598         rrd_set_error("flushing fd %d: %s", rrd_file->fd,
599                       rrd_strerror(errno));
600     }
601 }
602
603
604 /* Initialize RRD header.  */
605
606 void rrd_init(
607     rrd_t *rrd)
608 {
609     rrd->stat_head = NULL;
610     rrd->ds_def = NULL;
611     rrd->rra_def = NULL;
612     rrd->live_head = NULL;
613     rrd->legacy_last_up = NULL;
614     rrd->rra_ptr = NULL;
615     rrd->pdp_prep = NULL;
616     rrd->cdp_prep = NULL;
617     rrd->rrd_value = NULL;
618 }
619
620
621 /* free RRD header data.  */
622
623 #ifdef HAVE_MMAP
624 void rrd_free(
625     rrd_t *rrd)
626 {
627     if (rrd->legacy_last_up) {  /* this gets set for version < 3 only */
628         free(rrd->live_head);
629     }
630 }
631 #else
632 void rrd_free(
633     rrd_t *rrd)
634 {
635     free(rrd->live_head);
636     free(rrd->stat_head);
637     free(rrd->ds_def);
638     free(rrd->rra_def);
639     free(rrd->rra_ptr);
640     free(rrd->pdp_prep);
641     free(rrd->cdp_prep);
642     free(rrd->rrd_value);
643 }
644 #endif
645
646
647 /* routine used by external libraries to free memory allocated by
648  * rrd library */
649
650 void rrd_freemem(
651     void *mem)
652 {
653     free(mem);
654 }
655
656 /*
657  * rra_update informs us about the RRAs being updated
658  * The low level storage API may use this information for
659  * aligning RRAs within stripes, or other performance enhancements
660  */
661 void rrd_notify_row(
662     rrd_file_t *rrd_file,
663     int rra_idx,
664     unsigned long rra_row,
665     time_t rra_time)
666 {
667 }
668
669 /*
670  * This function is called when creating a new RRD
671  * The storage implementation can use this opportunity to select
672  * a sensible starting row within the file.
673  * The default implementation is random, to ensure that all RRAs
674  * don't change to a new disk block at the same time
675  */
676 unsigned long rrd_select_initial_row(
677     rrd_file_t *rrd_file,
678     int rra_idx,
679     rra_def_t *rra
680     )
681 {
682     return rra_random_row(rra);
683 }
684
685 static int rand_init = 0;
686
687 long int rra_random_row(
688     rra_def_t *rra)
689 {
690     if (!rand_init) {
691         srandom((unsigned int) time(NULL) + (unsigned int) getpid());
692         rand_init++;
693     }
694
695     return random() % rra->row_cnt;
696 }
697