fix off by 1 error
[rrdtool.git] / src / rrd_fetch_libdbi.c
1 #include "rrd_tool.h"
2 #include "unused.h"
3 #include <dbi/dbi.h>
4 #include <time.h>
5
6 /* the structures */
7 struct sql_table_helper {
8   dbi_conn conn;
9   int connected;
10   dbi_result result;
11   char const* filename;
12   char const* dbdriver;
13   char* table_start;
14   char* table_next;
15   char const* where;
16   char * timestamp;
17   char * value;
18 };
19
20 /* the prototypes */
21 static void _sql_close(struct sql_table_helper* th);
22 static int _sql_setparam(struct sql_table_helper* th,char* key, char* value);
23 static int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value,int ordered);
24 static char* _find_next_separator(char* start,char separator);
25 static char* _find_next_separator_twice(char*start,char separator);
26 static char _hexcharhelper(char c);
27 static int _inline_unescape (char* string);
28 static double rrd_fetch_dbi_double(dbi_result *result,int idx);
29 static long rrd_fetch_dbi_long(dbi_result *result,int idx);
30
31 /* the real code */
32
33 /* helpers to get correctly converted values from DB*/
34 static long rrd_fetch_dbi_long(dbi_result *result,int idx) {
35   char *ptmp="";
36   long value=DNAN;
37   /* get the attributes for this filed */
38   unsigned int attr=dbi_result_get_field_attribs_idx(result,idx);
39   unsigned int type=dbi_result_get_field_type_idx(result,idx);
40   /* return NAN if NULL */
41   if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
42   /* do some conversions */
43   switch (type) {
44     case DBI_TYPE_STRING:
45       ptmp=(char*)dbi_result_get_string_idx(result,idx);
46       value=atoi(ptmp);
47       break;
48     case DBI_TYPE_INTEGER:
49       if        (attr & DBI_INTEGER_SIZE1) { value=dbi_result_get_char_idx(result,idx);
50       } else if (attr & DBI_INTEGER_SIZE2) { value=dbi_result_get_short_idx(result,idx);
51       } else if (attr & DBI_INTEGER_SIZE3) { value=dbi_result_get_int_idx(result,idx);
52       } else if (attr & DBI_INTEGER_SIZE4) { value=dbi_result_get_int_idx(result,idx);
53       } else if (attr & DBI_INTEGER_SIZE8) { value=dbi_result_get_longlong_idx(result,idx);
54       } else {                               value=DNAN;
55         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type INTEGER\n",time(NULL),idx,attr ); }
56       }
57       break;
58     case DBI_TYPE_DECIMAL:
59       if        (attr & DBI_DECIMAL_SIZE4) { value=floor(dbi_result_get_float_idx(result,idx));
60       } else if (attr & DBI_DECIMAL_SIZE8) { value=floor(dbi_result_get_double_idx(result,idx));
61       } else {                               value=DNAN;
62         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type DECIMAL\n",time(NULL),idx,attr ); }
63       }
64       break;
65     case DBI_TYPE_BINARY:
66       attr=dbi_result_get_field_length_idx(result,idx);
67       ptmp=(char*)dbi_result_get_binary_copy_idx(result,idx);
68       ptmp[attr-1]=0;
69       /* check for "known" libdbi error */
70       if (strncmp("ERROR",ptmp,5)==0) {
71         if (!getenv("RRD_NO_LIBDBI_BUG_WARNING")) {
72           fprintf(stderr,"rrdtool_fetch_libDBI: you have possibly triggered a bug in libDBI by using a (TINY,MEDIUM,LONG) TEXT field with mysql\n  this may trigger a core dump in at least one version of libdbi\n  if you are not touched by this bug and you find this message annoying\n  please set the environment-variable RRD_NO_LIBDBI_BUG_WARNING to ignore this message\n");
73         }
74       }
75       /* convert to number */
76       value=atoi(ptmp);
77       /* free pointer */
78       free(ptmp);
79       break;
80     case DBI_TYPE_DATETIME:
81        value=dbi_result_get_datetime_idx(result,idx);
82        break;
83     default:
84       if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported type: %i with attribute %i\n",time(NULL),idx,type,attr ); }
85       value=DNAN;
86       break;
87   }
88   return value;
89 }
90
91 static double rrd_fetch_dbi_double(dbi_result *result,int idx) {
92   char *ptmp="";
93   double value=DNAN;
94   /* get the attributes for this filed */
95   unsigned int attr=dbi_result_get_field_attribs_idx(result,idx);
96   unsigned int type=dbi_result_get_field_type_idx(result,idx);
97   /* return NAN if NULL */
98   if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
99   /* do some conversions */
100   switch (type) {
101     case DBI_TYPE_STRING:
102       ptmp=(char*)dbi_result_get_string_idx(result,idx);
103       value=strtod(ptmp,NULL);
104       break;
105     case DBI_TYPE_INTEGER:
106       if        (attr & DBI_INTEGER_SIZE1) { value=dbi_result_get_char_idx(result,idx);
107       } else if (attr & DBI_INTEGER_SIZE2) { value=dbi_result_get_short_idx(result,idx);
108       } else if (attr & DBI_INTEGER_SIZE3) { value=dbi_result_get_int_idx(result,idx);
109       } else if (attr & DBI_INTEGER_SIZE4) { value=dbi_result_get_int_idx(result,idx);
110       } else if (attr & DBI_INTEGER_SIZE8) { value=dbi_result_get_longlong_idx(result,idx);
111       } else {                               value=DNAN;
112         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type INTEGER\n",time(NULL),idx,attr ); }
113       }
114       break;
115     case DBI_TYPE_DECIMAL:
116       if        (attr & DBI_DECIMAL_SIZE4) { value=dbi_result_get_float_idx(result,idx);
117       } else if (attr & DBI_DECIMAL_SIZE8) { value=dbi_result_get_double_idx(result,idx);
118       } else {                               value=DNAN;
119         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type DECIMAL\n",time(NULL),idx,attr ); }
120       }
121       break;
122     case DBI_TYPE_BINARY:
123       attr=dbi_result_get_field_length_idx(result,idx);
124       ptmp=(char*)dbi_result_get_binary_copy_idx(result,idx);
125       ptmp[attr-1]=0;
126       /* check for "known" libdbi error */
127       if (strncmp("ERROR",ptmp,5)==0) {
128         if (!getenv("RRD_NO_LIBDBI_BUG_WARNING")) {
129           fprintf(stderr,"rrdtool_fetch_libDBI: you have possibly triggered a bug in libDBI by using a (TINY,MEDIUM,LONG) TEXT field with mysql\n  this may trigger a core dump in at least one version of libdbi\n  if you are not touched by this bug and you find this message annoying\n  please set the environment-variable RRD_NO_LIBDBI_BUG_WARNING to ignore this message\n");
130         }
131       }
132       /* convert to number */
133       value=strtod(ptmp,NULL);
134       /* free pointer */
135       free(ptmp);
136       break;
137     case DBI_TYPE_DATETIME:
138        value=dbi_result_get_datetime_idx(result,idx);
139        break;
140     default:
141       if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported type: %i with attribute %i\n",time(NULL),idx,type,attr ); }
142       value=DNAN;
143       break;
144   }
145   return value;
146 }
147
148 static void _sql_close(struct sql_table_helper* th) {
149   /* close only if connected */
150   if (th->conn) {
151     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: close connection\n",time(NULL) ); }
152     /* shutdown dbi */
153     dbi_conn_close(th->conn);
154     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: shutting down libdbi\n",time(NULL) ); }
155     dbi_shutdown();
156     /* and assign empty */
157     th->conn=NULL;
158     th->connected=0;
159   }
160 }
161
162 static int _sql_setparam(struct sql_table_helper* th,char* key, char* value) {
163   char* dbi_errstr=NULL;
164   dbi_driver driver;
165   /* if not connected */
166   if (! th->conn) {
167     /* initialize some stuff */
168     th->table_next=th->table_start;
169     th->result=NULL;
170     th->connected=0;
171     /* initialize db */
172     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: initialize libDBI\n",time(NULL) ); }
173     dbi_initialize(NULL);
174     /* load the driver */
175     driver=dbi_driver_open(th->dbdriver);
176     if (! driver) {
177       rrd_set_error( "libdbi - no such driver: %s (possibly a dynamic link problem of the driver being linked without -ldbi)",th->dbdriver); 
178       return -1; 
179     }
180     /* and connect to driver */
181     th->conn=dbi_conn_open(driver);
182     /* and handle errors */
183     if (! th->conn) { 
184       rrd_set_error( "libdbi - could not open connection to driver %s",th->dbdriver); 
185       dbi_shutdown();
186       return -1;
187     }
188   }
189   if (th->connected) {
190     rrd_set_error( "we are already connected - can not set parameter %s=%s",key,value);
191     _sql_close(th);
192     return -1; 
193   }
194   if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: setting option %s to %s\n",time(NULL),key,value ); }
195   if (dbi_conn_set_option(th->conn,key,value)) {
196     dbi_conn_error(th->conn,(const char**)&dbi_errstr);
197     rrd_set_error( "libdbi: problems setting %s to %s - %s",key,value,dbi_errstr);
198     _sql_close(th);
199     return -1;
200   }
201   return 0;
202 }
203
204 static int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value,int ordered) {
205   char* dbi_errstr=NULL;
206   char sql[10240];
207   time_t startt=0,endt=0;
208   /*connect to the database if needed */
209   if (! th->conn) {
210       rrd_set_error( "libdbi no parameters set for libdbi",th->filename,dbi_errstr);
211       return -1;
212   }
213   if (! th->connected) {
214     /* and now connect */
215     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: connect to DB\n",time(NULL) ); }
216     if (dbi_conn_connect(th->conn) <0) {
217       dbi_conn_error(th->conn,(const char**)&dbi_errstr);
218       rrd_set_error( "libdbi: problems connecting to db with connect string %s - error: %s",th->filename,dbi_errstr);
219       _sql_close(th);
220       return -1;
221     }
222     th->connected=1;
223   }
224   /* now find out regarding an existing result-set */
225   if (! th->result) {
226     /* return if table_next is NULL */
227     if (th->table_next==NULL) { 
228     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: reached last table to connect to\n",time(NULL) ); }
229       /* but first close connection */
230       _sql_close(th);
231       /* and return with end of data */
232       return 0;
233     }
234     /* calculate the table to use next */
235     th->table_start=th->table_next;
236     th->table_next=_find_next_separator(th->table_start,'+');
237     _inline_unescape(th->table_start);
238     /* and prepare FULL SQL Statement */
239     if (ordered) {
240       snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s ORDER BY %s",
241                th->timestamp,th->value,th->table_start,th->where,th->timestamp);
242     } else {
243       snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s",
244                th->timestamp,th->value,th->table_start,th->where);
245     }
246     /* and execute sql */
247     if (getenv("RRDDEBUGSQL")) { startt=time(NULL); fprintf(stderr,"RRDDEBUGSQL: %li: executing %s\n",startt,sql); }
248     th->result=dbi_conn_query(th->conn,sql);
249     if (startt) { endt=time(NULL);fprintf(stderr,"RRDDEBUGSQL: %li: timing %li\n",endt,endt-startt); }
250     /* handle error case */
251     if (! th->result) {
252       dbi_conn_error(th->conn,(const char**)&dbi_errstr);      
253       if (startt) { fprintf(stderr,"RRDDEBUGSQL: %li: error %s\n",endt,dbi_errstr); }
254       rrd_set_error("libdbi: problems with query: %s - errormessage: %s",sql,dbi_errstr);
255       _sql_close(th);
256       return -1;
257     }
258   }
259   /* and now fetch key and value */
260   if (! dbi_result_next_row(th->result)) {
261     /* free result */
262     dbi_result_free(th->result);
263     th->result=NULL;
264     /* and call recursively - this will open the next table or close connection as a whole*/
265     return _sql_fetchrow(th,timestamp,value,ordered);
266   } 
267   /* and return with flag for one value */
268   *timestamp=rrd_fetch_dbi_long(th->result,1);
269   *value=rrd_fetch_dbi_double(th->result,2);
270   return 1;
271 }
272
273 static char* _find_next_separator(char* start,char separator) {
274   char* found=strchr(start,separator);
275   /* have we found it */
276   if (found) {
277     /* then 0 terminate current string */
278     *found=0; 
279     /* and return the pointer past the separator */
280     return (found+1);
281   }
282   /* not found, so return NULL */
283   return NULL;
284 }
285
286 static char* _find_next_separator_twice(char*start,char separator) {
287   char *found=start;
288   /* find next separator in string*/
289   while (found) {
290     /* if found and the next one is also a separator */
291     if (found[1] == separator) {
292       /* then 0 terminate current string */
293       *found=0;
294       /* and return the pointer past the current one*/
295       return (found+2);
296     }
297     /* find next occurance */
298     found=strchr(found+1,separator);
299   }
300   /* not found, so return NULL */
301   return NULL;
302 }
303
304 static char _hexcharhelper(char c) {
305   switch (c) {
306   case '0': return 0 ; break;
307   case '1': return 1 ; break;
308   case '2': return 2 ; break;
309   case '3': return 3 ; break;
310   case '4': return 4 ; break;
311   case '5': return 5 ; break;
312   case '6': return 6 ; break;
313   case '7': return 7 ; break;
314   case '8': return 8 ; break;
315   case '9': return 9 ; break;
316   case 'a': return 10 ; break;
317   case 'b': return 11 ; break;
318   case 'c': return 12 ; break;
319   case 'd': return 13 ; break;
320   case 'e': return 14 ; break;
321   case 'f': return 15 ; break;
322   case 'A': return 10 ; break;
323   case 'B': return 11 ; break;
324   case 'C': return 12 ; break;
325   case 'D': return 13 ; break;
326   case 'E': return 14 ; break;
327   case 'F': return 15 ; break;
328   }
329   return -1;
330 }
331
332 static int _inline_unescape (char* string) {
333   char *src=string;
334   char *dst=string;
335   char c,h1,h2;
336   while((c= *src)) {
337     src++;
338     if (c == '%') {
339       if (*src == '%') { 
340         /* increase src pointer by 1 skiping second % */
341         src+=1;
342       } else {
343         /* try to calculate hex value from the next 2 values*/
344         h1=_hexcharhelper(*src);
345         if (h1 == (char)-1) {
346           rrd_set_error("string escape error at: %s\n",string);
347           return(1);
348         }
349         h2=_hexcharhelper(*(src+1));
350         if (h1 == (char)-1) {
351           rrd_set_error("string escape error at: %s\n",string);
352           return(1);
353         }
354         c=h2+(h1<<4);
355         /* increase src pointer by 2 skiping 2 chars */
356         src+=2;
357       } 
358     }
359     *dst=c;
360     dst++;
361   }
362   *dst=0;
363   return 0;
364 }
365
366 int
367 rrd_fetch_fn_libdbi(
368     const char     *filename,  /* name of the rrd */
369     enum cf_en     UNUSED(cf_idx), /* consolidation function */
370     time_t         *start,
371     time_t         *end,       /* which time frame do you want ?
372                                 * will be changed to represent reality */
373     unsigned long  *step,      /* which stepsize do you want? 
374                                 * will be changed to represent reality */
375     unsigned long  *ds_cnt,    /* number of data sources in file */
376     char           ***ds_namv, /* names of data_sources */
377     rrd_value_t    **data)     /* two dimensional array containing the data */
378 {
379   /* the separator used */
380   char separator='/';
381   /* a local copy of the filename - used for copying plus some pointer variables */
382   char filenameworkcopy[10240];
383   char *tmpptr=filenameworkcopy;
384   char *nextptr=NULL;
385   char *libdbiargs=NULL;
386   char *sqlargs=NULL;
387   /* the settings for the "works" of rrd */
388   int fillmissing=0;
389   unsigned long minstepsize=300;
390   /* by default assume unixtimestamp */
391   int isunixtime=1;
392   long gmt_offset=0;
393   /* the result-set */
394   long r_timestamp,l_timestamp,d_timestamp;
395   double r_value,l_value,d_value;
396   int r_status;
397   int rows;
398   long idx;
399   int derive=0;
400   /* the libdbi connection data and the table_help structure */
401   struct sql_table_helper table_help;
402   char where[10240];
403   table_help.conn=NULL;
404   table_help.where=where;
405   table_help.filename=filename;
406
407   /* some loop variables */
408   int i=0;
409
410   /* check header */
411   if (strncmp("sql",filename,3)!=0) { 
412     rrd_set_error( "formatstring wrong - %s",filename );return -1; 
413   }
414   if (filename[3]!=filename[4]) { 
415     rrd_set_error( "formatstring wrong - %s",filename );return -1; 
416   }
417
418   /* now make this the separator */
419   separator=filename[3];
420
421   /* copy filename for local modifications during parsing */
422   strncpy(filenameworkcopy,filename+5,sizeof(filenameworkcopy));
423
424   /* get the driver */
425   table_help.dbdriver=tmpptr;
426   libdbiargs=_find_next_separator(tmpptr,separator);
427   if (! libdbiargs) { 
428     /* error in argument */
429     rrd_set_error( "formatstring wrong as we did not find \"%c\"- %s",separator,table_help.dbdriver);
430     return -1; 
431   }
432
433   /* now find the next double separator - this defines the args to the database */
434   sqlargs=_find_next_separator_twice(libdbiargs,separator);
435   if (!sqlargs) {
436     rrd_set_error( "formatstring wrong for db arguments as we did not find \"%c%c\" in \"%s\"",separator,separator,libdbiargs);
437     return 1;
438   }
439
440   /* now we can start with the SQL Statement - best to start with this first, 
441      as then the error-handling is easier, as we do not have to handle libdbi shutdown as well */
442
443   /* parse the table(s) */
444   table_help.table_start=sqlargs;
445   nextptr=_find_next_separator(table_help.table_start,separator);
446   if (! nextptr) { 
447     /* error in argument */
448     rrd_set_error( "formatstring wrong - %s",tmpptr);
449     return -1; 
450   }
451   /* hex-unescape the value */
452   if(_inline_unescape(table_help.table_start)) { return -1; }
453
454   /* parse the unix timestamp column */
455   table_help.timestamp=nextptr;
456   nextptr=_find_next_separator(nextptr,separator);
457   if (! nextptr) { 
458     /* error in argument */
459     rrd_set_error( "formatstring wrong - %s",tmpptr);
460     return -1; 
461   }
462   /* if we have leading '*', then we have a TIMEDATE Field*/
463   if (table_help.timestamp[0]=='*') {
464     struct tm tm;
465 #ifdef HAVE_TIMEZONE
466     extern long timezone;
467 #endif
468     time_t t=time(NULL);
469     localtime_r(&t,&tm);
470 #ifdef HAVE_TM_GMTOFF
471     gmt_offset=tm.TM_GMTOFF;
472 #else
473 #ifdef HAVE_TIMEZONE
474     gmt_offset=timezone;
475 #endif
476 #endif
477     isunixtime=0; table_help.timestamp++;
478   }
479   /* hex-unescape the value */
480   if(_inline_unescape(table_help.timestamp)) { return -1; }
481
482   /* parse the value column */
483   table_help.value=nextptr;
484   nextptr=_find_next_separator(nextptr,separator);
485   if (! nextptr) { 
486     /* error in argument */
487     rrd_set_error( "formatstring wrong - %s",tmpptr);
488     return -1; 
489   }
490   /* hex-unescape the value */
491   if(_inline_unescape(table_help.value)) { return -1; }
492   
493   /* now prepare WHERE clause as empty string*/
494   where[0]=0;
495
496   /* and the where clause */
497   sqlargs=nextptr;
498   while(sqlargs) {
499     /* find next separator */
500     nextptr=_find_next_separator(sqlargs,separator);
501     /* now handle fields */
502     if (strcmp(sqlargs,"derive")==0) { /* the derive option with the default allowed max delta */
503       derive=600;
504     } else if (strcmp(sqlargs,"prediction")==0) {
505       rrd_set_error("argument prediction is no longer supported in a DEF - use new generic CDEF-functions instead");
506       return -1;
507     } else if (strcmp(sqlargs,"sigma")==0) {
508       rrd_set_error("argument sigma is no longer supported in a DEF - use new generic CDEF-functions instead");
509       return -1;
510     } else if (*sqlargs==0) { /* ignore empty */
511     } else { /* else add to where string */
512       if (where[0]) {strcat(where," AND ");}
513       strcat(where,sqlargs);
514     }
515     /* and continue loop with next pointer */
516     sqlargs=nextptr;
517   }
518   /* and unescape */
519   if(_inline_unescape(where)) { return -1; }
520
521   /* now parse LIBDBI options - this start initializing libdbi and beyond this point we need to reset the db as well in case of errors*/
522   while (libdbiargs) {
523     /* find separator */
524     nextptr=_find_next_separator(libdbiargs,separator);
525     /* now find =, separating key from value*/
526     tmpptr=_find_next_separator(libdbiargs,'=');
527     if (! tmpptr) { 
528       rrd_set_error( "formatstring wrong for db arguments as we did not find \"=\" in \"%s\"",libdbiargs);
529       _sql_close(&table_help);
530       return 1;
531     }
532     /* hex-unescape the value */
533     if(_inline_unescape(tmpptr)) { return -1; }
534     /* now handle the key/value pair */
535     if (strcmp(libdbiargs,"rrdminstepsize")==0) { /* allow override for minstepsize */
536       i=atoi(tmpptr);if (i>0) { minstepsize=i; }
537     } else if (strcmp(libdbiargs,"rrdfillmissing")==0) { /* allow override for minstepsize */
538       i=atoi(tmpptr);if (i>0) { fillmissing=i; }
539     } else if (strcmp(libdbiargs,"rrdderivemaxstep")==0) { /* allow override for derived max delta */
540       i=atoi(tmpptr);if (i>0) { if (derive) { derive=i; }}
541     } else { /* store in libdbi, as these are parameters */
542       if (_sql_setparam(&table_help,libdbiargs,tmpptr)) { return -1; }
543     }
544     /* and continue loop with next pointer */
545     libdbiargs=nextptr;
546   }
547   
548   /* and modify step if given */
549   if (*step<minstepsize) {*step=minstepsize;}
550   *start-=(*start)%(*step);
551   *end-=(*end)%(*step);
552
553   /* and append the SQL WHERE Clause for the timeframe calculated above (adding AND if required) */
554   if (where[0]) {strcat(where," AND ");}
555   i=strlen(where);
556   if (isunixtime) {
557     snprintf(where+i,sizeof(where)-1-i,"%li < %s AND %s < %li",*start,table_help.timestamp,table_help.timestamp,*end);
558   } else {
559     char tsstart[64];strftime(tsstart,sizeof(tsstart),"%Y-%m-%d %H:%M:%S",localtime(start));
560     char tsend[64];strftime(tsend,sizeof(tsend),"%Y-%m-%d %H:%M:%S",localtime(end));
561     snprintf(where+i,sizeof(where)-1-i,"'%s' < %s AND %s < '%s'",tsstart,table_help.timestamp,table_help.timestamp,tsend);
562   }
563
564   /* and now calculate the number of rows in the resultset... */
565   rows=((*end)-(*start))/(*step)+2;
566   
567   /* define the result set variables/columns returned */
568   *ds_cnt=5;
569   *ds_namv=(char**)malloc((*ds_cnt)*sizeof(char*));
570   for (i=0;i<(int)(*ds_cnt);i++) {
571     tmpptr=(char*)malloc(sizeof(char) * DS_NAM_SIZE);
572     (*ds_namv)[i]=tmpptr;
573     /* now copy what is required */
574     switch (i) {
575     case 0: strncpy(tmpptr,"min",DS_NAM_SIZE-1); break;
576     case 1: strncpy(tmpptr,"avg",DS_NAM_SIZE-1); break;
577     case 2: strncpy(tmpptr,"max",DS_NAM_SIZE-1); break;
578     case 3: strncpy(tmpptr,"count",DS_NAM_SIZE-1); break;
579     case 4: strncpy(tmpptr,"sigma",DS_NAM_SIZE-1); break;
580     }
581   }
582
583   /* allocate memory for resultset (with the following columns: min,avg,max,count,sigma) */
584   i=(rows+1) * sizeof(rrd_value_t)*(*ds_cnt);
585   if (((*data) = malloc(i))==NULL){
586     /* and return error */
587     rrd_set_error("malloc failed for %i bytes",i);
588     return(-1);
589   }
590   /* and fill with NAN */
591   for(i=0;i<rows;i++) {
592     (*data)[i*(*ds_cnt)+0]=DNAN; /* MIN */
593     (*data)[i*(*ds_cnt)+1]=DNAN; /* AVG */
594     (*data)[i*(*ds_cnt)+2]=DNAN; /* MAX */
595     (*data)[i*(*ds_cnt)+3]=0;    /* COUNT */
596     (*data)[i*(*ds_cnt)+4]=DNAN; /* SIGMA */
597   }
598   /* and assign undefined values for last - in case of derived calculation */
599   l_value=DNAN;l_timestamp=0;
600   /* here goes the real work processing all data */
601   while((r_status=_sql_fetchrow(&table_help,&r_timestamp,&r_value,derive))>0) {
602     /* processing of value */
603     /* calculate index for the timestamp */
604     r_timestamp-=gmt_offset;
605     idx=(r_timestamp-(*start))/(*step);
606     /* some out of bounds checks on idx */
607     if (idx<0) { idx=0;}
608     if (idx>rows) { idx=rows;}
609     /* and calculate derivative if necessary */
610     if (derive) {
611       /* calc deltas */
612       d_timestamp=r_timestamp-l_timestamp;
613       d_value=r_value-l_value;
614       /* assign current as last values */
615       l_timestamp=r_timestamp;
616       l_value=r_value;
617       /* assign DNAN by default for value */
618       r_value=DNAN;
619       /* check for timestamp delta to be within an acceptable range */
620       if ((d_timestamp>0)&&(d_timestamp<2*derive)) {
621         /* only handle positive delta - avoid wrap-arrounds/counter resets showing up as spikes */
622         if (d_value>0) {
623           /* and normalize to per second */
624           r_value=d_value/d_timestamp;
625         }
626       }
627     }
628     /* only add value if we have a value that is not NAN */
629     if (! isnan(r_value)) {
630       if ((*data)[idx*(*ds_cnt)+3]==0) { /* count is 0 so assign to overwrite DNAN */
631         (*data)[idx*(*ds_cnt)+0]=r_value; /* MIN */
632         (*data)[idx*(*ds_cnt)+1]=r_value; /* AVG */
633         (*data)[idx*(*ds_cnt)+2]=r_value; /* MAX */
634         (*data)[idx*(*ds_cnt)+3]=1;       /* COUNT */
635        (*data)[idx*(*ds_cnt)+4]=r_value*r_value; /* SIGMA */
636       } else {
637         /* MIN */
638         if ((*data)[idx*(*ds_cnt)+0]>r_value) { (*data)[idx*(*ds_cnt)+0]=r_value; }
639         /* AVG - at this moment still sum - corrected in post processing */
640         (*data)[idx*(*ds_cnt)+1]+=r_value;
641         /* MAX */
642         if ((*data)[idx*(*ds_cnt)+2]<r_value) { (*data)[idx*(*ds_cnt)+2]=r_value; }
643         /* COUNT */
644         (*data)[idx*(*ds_cnt)+3]++;
645         /* SIGMA - at this moment still sum of squares - corrected in post processing */
646         (*data)[idx*(*ds_cnt)+4]+=r_value*r_value;
647       }
648     }
649   }
650   /* and check for negativ status, pass back immediately */
651   if (r_status==-1) { return -1; }
652
653   /* post processing */
654   for(idx=0;idx<rows;idx++) {
655     long count=(*data)[idx*(*ds_cnt)+3];
656     if (count>0) {
657       /* calc deviation first */
658       if (count>2) {
659         r_value=count*(*data)[idx*(*ds_cnt)+4]-(*data)[idx*(*ds_cnt)+1]*(*data)[idx*(*ds_cnt)+1];
660         if (r_value<0) { 
661           r_value=DNAN; 
662         } else {
663           r_value=sqrt(r_value/(count*(count-1)));
664         }
665       }
666       (*data)[idx*(*ds_cnt)+4]=r_value;
667       /* now the average */
668       (*data)[idx*(*ds_cnt)+1]/=count;
669     }
670   }
671
672   /* Fill in missing values */
673   fillmissing/=(*step);/* Convert from seconds to steps */
674   if (fillmissing>0) {
675     int copy_left=fillmissing;
676     for(idx=1;idx<rows;idx++) {
677       long count=(*data)[idx*(*ds_cnt)+3];
678       if (count==0) {
679         /* No data this bin */
680         if (copy_left>0) {
681           /* But we can copy from previous */
682           int idx_p=idx-1;
683           (*data)[idx*(*ds_cnt)+0]=(*data)[idx_p*(*ds_cnt)+0];
684           (*data)[idx*(*ds_cnt)+1]=(*data)[idx_p*(*ds_cnt)+1];
685           (*data)[idx*(*ds_cnt)+2]=(*data)[idx_p*(*ds_cnt)+2];
686           (*data)[idx*(*ds_cnt)+3]=(*data)[idx_p*(*ds_cnt)+3];
687           (*data)[idx*(*ds_cnt)+4]=(*data)[idx_p*(*ds_cnt)+4];
688           copy_left--;
689         }
690       }else{
691         copy_left=fillmissing;
692       }
693     }
694   }
695
696   /* and return OK */
697   return 0;
698 }