0e8bc939b32eab0bdc5c4999241fdaebb25be7ba
[rrdtool.git] / src / rrd_fetch_libdbi.c
1 #include "rrd_tool.h"
2 #include <dbi/dbi.h>
3 #include <time.h>
4
5 /* the structures */
6 struct sql_table_helper {
7   dbi_conn conn;
8   int connected;
9   dbi_result result;
10   char const* filename;
11   char const* dbdriver;
12   char* table_start;
13   char* table_next;
14   char const* where;
15   char * timestamp;
16   char * value;
17 };
18
19 /* the prototypes */
20 static void _sql_close(struct sql_table_helper* th);
21 static int _sql_setparam(struct sql_table_helper* th,char* key, char* value);
22 static int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value,int ordered);
23 static char* _find_next_separator(char* start,char separator);
24 static char* _find_next_separator_twice(char*start,char separator);
25 static char _hexcharhelper(char c);
26 static int _inline_unescape (char* string);
27 static double rrd_fetch_dbi_double(dbi_result *result,int idx);
28 static long rrd_fetch_dbi_long(dbi_result *result,int idx);
29
30 /* the real code */
31
32 /* helpers to get correctly converted values from DB*/
33 static long rrd_fetch_dbi_long(dbi_result *result,int idx) {
34   char *ptmp="";
35   long value=DNAN;
36   /* get the attributes for this filed */
37   unsigned int attr=dbi_result_get_field_attribs_idx(result,idx);
38   unsigned int type=dbi_result_get_field_type_idx(result,idx);
39   /* return NAN if NULL */
40   if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
41   /* do some conversions */
42   switch (type) {
43     case DBI_TYPE_STRING:
44       ptmp=(char*)dbi_result_get_string_idx(result,idx);
45       value=atoi(ptmp);
46       break;
47     case DBI_TYPE_INTEGER:
48       if        (attr & DBI_INTEGER_SIZE1) { value=dbi_result_get_char_idx(result,idx);
49       } else if (attr & DBI_INTEGER_SIZE2) { value=dbi_result_get_short_idx(result,idx);
50       } else if (attr & DBI_INTEGER_SIZE3) { value=dbi_result_get_int_idx(result,idx);
51       } else if (attr & DBI_INTEGER_SIZE4) { value=dbi_result_get_int_idx(result,idx);
52       } else if (attr & DBI_INTEGER_SIZE8) { value=dbi_result_get_longlong_idx(result,idx);
53       } else {                               value=DNAN;
54         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type INTEGER\n",time(NULL),idx,attr ); }
55       }
56       break;
57     case DBI_TYPE_DECIMAL:
58       if        (attr & DBI_DECIMAL_SIZE4) { value=floor(dbi_result_get_float_idx(result,idx));
59       } else if (attr & DBI_DECIMAL_SIZE8) { value=floor(dbi_result_get_double_idx(result,idx));
60       } else {                               value=DNAN;
61         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type DECIMAL\n",time(NULL),idx,attr ); }
62       }
63       break;
64     case DBI_TYPE_BINARY:
65       attr=dbi_result_get_field_length_idx(result,idx);
66       ptmp=(char*)dbi_result_get_binary_copy_idx(result,idx);
67       ptmp[attr-1]=0;
68       /* check for "known" libdbi error */
69       if (strncmp("ERROR",ptmp,5)==0) {
70         if (!getenv("RRD_NO_LIBDBI_BUG_WARNING")) {
71           fprintf(stderr,"rrdtool_fetch_libDBI: you have possibly triggered a bug in libDBI by using a (TINY,MEDIUM,LONG) TEXT field with mysql\n  this may trigger a core dump in at least one version of libdbi\n  if you are not touched by this bug and you find this message annoying\n  please set the environment-variable RRD_NO_LIBDBI_BUG_WARNING to ignore this message\n");
72         }
73       }
74       /* convert to number */
75       value=atoi(ptmp);
76       /* free pointer */
77       free(ptmp);
78       break;
79     case DBI_TYPE_DATETIME:
80        value=dbi_result_get_datetime_idx(result,idx);
81        break;
82     default:
83       if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported type: %i with attribute %i\n",time(NULL),idx,type,attr ); }
84       value=DNAN;
85       break;
86   }
87   return value;
88 }
89
90 static double rrd_fetch_dbi_double(dbi_result *result,int idx) {
91   char *ptmp="";
92   double value=DNAN;
93   /* get the attributes for this filed */
94   unsigned int attr=dbi_result_get_field_attribs_idx(result,idx);
95   unsigned int type=dbi_result_get_field_type_idx(result,idx);
96   /* return NAN if NULL */
97   if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
98   /* do some conversions */
99   switch (type) {
100     case DBI_TYPE_STRING:
101       ptmp=(char*)dbi_result_get_string_idx(result,idx);
102       value=strtod(ptmp,NULL);
103       break;
104     case DBI_TYPE_INTEGER:
105       if        (attr & DBI_INTEGER_SIZE1) { value=dbi_result_get_char_idx(result,idx);
106       } else if (attr & DBI_INTEGER_SIZE2) { value=dbi_result_get_short_idx(result,idx);
107       } else if (attr & DBI_INTEGER_SIZE3) { value=dbi_result_get_int_idx(result,idx);
108       } else if (attr & DBI_INTEGER_SIZE4) { value=dbi_result_get_int_idx(result,idx);
109       } else if (attr & DBI_INTEGER_SIZE8) { value=dbi_result_get_longlong_idx(result,idx);
110       } else {                               value=DNAN;
111         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type INTEGER\n",time(NULL),idx,attr ); }
112       }
113       break;
114     case DBI_TYPE_DECIMAL:
115       if        (attr & DBI_DECIMAL_SIZE4) { value=dbi_result_get_float_idx(result,idx);
116       } else if (attr & DBI_DECIMAL_SIZE8) { value=dbi_result_get_double_idx(result,idx);
117       } else {                               value=DNAN;
118         if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type DECIMAL\n",time(NULL),idx,attr ); }
119       }
120       break;
121     case DBI_TYPE_BINARY:
122       attr=dbi_result_get_field_length_idx(result,idx);
123       ptmp=(char*)dbi_result_get_binary_copy_idx(result,idx);
124       ptmp[attr-1]=0;
125       /* check for "known" libdbi error */
126       if (strncmp("ERROR",ptmp,5)==0) {
127         if (!getenv("RRD_NO_LIBDBI_BUG_WARNING")) {
128           fprintf(stderr,"rrdtool_fetch_libDBI: you have possibly triggered a bug in libDBI by using a (TINY,MEDIUM,LONG) TEXT field with mysql\n  this may trigger a core dump in at least one version of libdbi\n  if you are not touched by this bug and you find this message annoying\n  please set the environment-variable RRD_NO_LIBDBI_BUG_WARNING to ignore this message\n");
129         }
130       }
131       /* convert to number */
132       value=strtod(ptmp,NULL);
133       /* free pointer */
134       free(ptmp);
135       break;
136     case DBI_TYPE_DATETIME:
137        value=dbi_result_get_datetime_idx(result,idx);
138        break;
139     default:
140       if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported type: %i with attribute %i\n",time(NULL),idx,type,attr ); }
141       value=DNAN;
142       break;
143   }
144   return value;
145 }
146
147 static void _sql_close(struct sql_table_helper* th) {
148   /* close only if connected */
149   if (th->conn) {
150     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: close connection\n",time(NULL) ); }
151     /* shutdown dbi */
152     dbi_conn_close(th->conn);
153     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: shutting down libdbi\n",time(NULL) ); }
154     dbi_shutdown();
155     /* and assign empty */
156     th->conn=NULL;
157     th->connected=0;
158   }
159 }
160
161 static int _sql_setparam(struct sql_table_helper* th,char* key, char* value) {
162   char* dbi_errstr=NULL;
163   dbi_driver driver;
164   /* if not connected */
165   if (! th->conn) {
166     /* initialize some stuff */
167     th->table_next=th->table_start;
168     th->result=NULL;
169     th->connected=0;
170     /* initialize db */
171     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: initialize libDBI\n",time(NULL) ); }
172     dbi_initialize(NULL);
173     /* load the driver */
174     driver=dbi_driver_open(th->dbdriver);
175     if (! driver) {
176       rrd_set_error( "libdbi - no such driver: %s (possibly a dynamic link problem of the driver being linked without -ldbi)",th->dbdriver); 
177       return -1; 
178     }
179     /* and connect to driver */
180     th->conn=dbi_conn_open(driver);
181     /* and handle errors */
182     if (! th->conn) { 
183       rrd_set_error( "libdbi - could not open connection to driver %s",th->dbdriver); 
184       dbi_shutdown();
185       return -1;
186     }
187   }
188   if (th->connected) {
189     rrd_set_error( "we are already connected - can not set parameter %s=%s",key,value);
190     _sql_close(th);
191     return -1; 
192   }
193   if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: setting option %s to %s\n",time(NULL),key,value ); }
194   if (dbi_conn_set_option(th->conn,key,value)) {
195     dbi_conn_error(th->conn,(const char**)&dbi_errstr);
196     rrd_set_error( "libdbi: problems setting %s to %s - %s",key,value,dbi_errstr);
197     _sql_close(th);
198     return -1;
199   }
200   return 0;
201 }
202
203 static int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value,int ordered) {
204   char* dbi_errstr=NULL;
205   char sql[10240];
206   time_t startt=0,endt=0;
207   /*connect to the database if needed */
208   if (! th->conn) {
209       rrd_set_error( "libdbi no parameters set for libdbi",th->filename,dbi_errstr);
210       return -1;
211   }
212   if (! th->connected) {
213     /* and now connect */
214     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: connect to DB\n",time(NULL) ); }
215     if (dbi_conn_connect(th->conn) <0) {
216       dbi_conn_error(th->conn,(const char**)&dbi_errstr);
217       rrd_set_error( "libdbi: problems connecting to db with connect string %s - error: %s",th->filename,dbi_errstr);
218       _sql_close(th);
219       return -1;
220     }
221     th->connected=1;
222   }
223   /* now find out regarding an existing result-set */
224   if (! th->result) {
225     /* return if table_next is NULL */
226     if (th->table_next==NULL) { 
227     if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: reached last table to connect to\n",time(NULL) ); }
228       /* but first close connection */
229       _sql_close(th);
230       /* and return with end of data */
231       return 0;
232     }
233     /* calculate the table to use next */
234     th->table_start=th->table_next;
235     th->table_next=_find_next_separator(th->table_start,'+');
236     _inline_unescape(th->table_start);
237     /* and prepare FULL SQL Statement */
238     if (ordered) {
239       snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s ORDER BY %s",
240                th->timestamp,th->value,th->table_start,th->where,th->timestamp);
241     } else {
242       snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s",
243                th->timestamp,th->value,th->table_start,th->where);
244     }
245     /* and execute sql */
246     if (getenv("RRDDEBUGSQL")) { startt=time(NULL); fprintf(stderr,"RRDDEBUGSQL: %li: executing %s\n",startt,sql); }
247     th->result=dbi_conn_query(th->conn,sql);
248     if (startt) { endt=time(NULL);fprintf(stderr,"RRDDEBUGSQL: %li: timing %li\n",endt,endt-startt); }
249     /* handle error case */
250     if (! th->result) {
251       dbi_conn_error(th->conn,(const char**)&dbi_errstr);      
252       if (startt) { fprintf(stderr,"RRDDEBUGSQL: %li: error %s\n",endt,dbi_errstr); }
253       rrd_set_error("libdbi: problems with query: %s - errormessage: %s",sql,dbi_errstr);
254       _sql_close(th);
255       return -1;
256     }
257   }
258   /* and now fetch key and value */
259   if (! dbi_result_next_row(th->result)) {
260     /* free result */
261     dbi_result_free(th->result);
262     th->result=NULL;
263     /* and call recursively - this will open the next table or close connection as a whole*/
264     return _sql_fetchrow(th,timestamp,value,ordered);
265   } 
266   /* and return with flag for one value */
267   *timestamp=rrd_fetch_dbi_long(th->result,1);
268   *value=rrd_fetch_dbi_double(th->result,2);
269   return 1;
270 }
271
272 static char* _find_next_separator(char* start,char separator) {
273   char* found=strchr(start,separator);
274   /* have we found it */
275   if (found) {
276     /* then 0 terminate current string */
277     *found=0; 
278     /* and return the pointer past the separator */
279     return (found+1);
280   }
281   /* not found, so return NULL */
282   return NULL;
283 }
284
285 static char* _find_next_separator_twice(char*start,char separator) {
286   char *found=start;
287   /* find next separator in string*/
288   while (found) {
289     /* if found and the next one is also a separator */
290     if (found[1] == separator) {
291       /* then 0 terminate current string */
292       *found=0;
293       /* and return the pointer past the current one*/
294       return (found+2);
295     }
296     /* find next occurance */
297     found=strchr(found+1,separator);
298   }
299   /* not found, so return NULL */
300   return NULL;
301 }
302
303 static char _hexcharhelper(char c) {
304   switch (c) {
305   case '0': return 0 ; break;
306   case '1': return 1 ; break;
307   case '2': return 2 ; break;
308   case '3': return 3 ; break;
309   case '4': return 4 ; break;
310   case '5': return 5 ; break;
311   case '6': return 6 ; break;
312   case '7': return 7 ; break;
313   case '8': return 8 ; break;
314   case '9': return 9 ; break;
315   case 'a': return 10 ; break;
316   case 'b': return 11 ; break;
317   case 'c': return 12 ; break;
318   case 'd': return 13 ; break;
319   case 'e': return 14 ; break;
320   case 'f': return 15 ; break;
321   case 'A': return 10 ; break;
322   case 'B': return 11 ; break;
323   case 'C': return 12 ; break;
324   case 'D': return 13 ; break;
325   case 'E': return 14 ; break;
326   case 'F': return 15 ; break;
327   }
328   return -1;
329 }
330
331 static int _inline_unescape (char* string) {
332   char *src=string;
333   char *dst=string;
334   char c,h1,h2;
335   while((c= *src)) {
336     src++;
337     if (c == '%') {
338       if (*src == '%') { 
339         /* increase src pointer by 1 skiping second % */
340         src+=1;
341       } else {
342         /* try to calculate hex value from the next 2 values*/
343         h1=_hexcharhelper(*src);
344         if (h1 == (char)-1) {
345           rrd_set_error("string escape error at: %s\n",string);
346           return(1);
347         }
348         h2=_hexcharhelper(*(src+1));
349         if (h1 == (char)-1) {
350           rrd_set_error("string escape error at: %s\n",string);
351           return(1);
352         }
353         c=h2+(h1<<4);
354         /* increase src pointer by 2 skiping 2 chars */
355         src+=2;
356       } 
357     }
358     *dst=c;
359     dst++;
360   }
361   *dst=0;
362   return 0;
363 }
364
365 int
366 rrd_fetch_fn_libdbi(
367     const char     *filename,  /* name of the rrd */
368     enum cf_en     cf_idx __attribute__((unused)), /* consolidation function */
369     time_t         *start,
370     time_t         *end,       /* which time frame do you want ?
371                                 * will be changed to represent reality */
372     unsigned long  *step,      /* which stepsize do you want? 
373                                 * will be changed to represent reality */
374     unsigned long  *ds_cnt,    /* number of data sources in file */
375     char           ***ds_namv, /* names of data_sources */
376     rrd_value_t    **data)     /* two dimensional array containing the data */
377 {
378   /* the separator used */
379   char separator='/';
380   /* a local copy of the filename - used for copying plus some pointer variables */
381   char filenameworkcopy[10240];
382   char *tmpptr=filenameworkcopy;
383   char *nextptr=NULL;
384   char *libdbiargs=NULL;
385   char *sqlargs=NULL;
386   /* the settings for the "works" of rrd */
387   int fillmissing=0;
388   unsigned long minstepsize=300;
389   /* by default assume unixtimestamp */
390   int isunixtime=1;
391   /* the result-set */
392   long r_timestamp,l_timestamp,d_timestamp;
393   double r_value,l_value,d_value;
394   int r_status;
395   int rows;
396   long idx;
397   int derive=0;
398   /* the libdbi connection data and the table_help structure */
399   struct sql_table_helper table_help;
400   char where[10240];
401   table_help.conn=NULL;
402   table_help.where=where;
403
404   /* some loop variables */
405   int i=0;
406
407   /* check header */
408   if (strncmp("sql",filename,3)!=0) { 
409     rrd_set_error( "formatstring wrong - %s",filename );return -1; 
410   }
411   if (filename[3]!=filename[4]) { 
412     rrd_set_error( "formatstring wrong - %s",filename );return -1; 
413   }
414
415   /* now make this the separator */
416   separator=filename[3];
417
418   /* copy filename for local modifications during parsing */
419   strncpy(filenameworkcopy,filename+5,sizeof(filenameworkcopy));
420
421   /* get the driver */
422   table_help.dbdriver=tmpptr;
423   libdbiargs=_find_next_separator(tmpptr,separator);
424   if (! libdbiargs) { 
425     /* error in argument */
426     rrd_set_error( "formatstring wrong as we did not find \"%c\"- %s",separator,table_help.dbdriver);
427     return -1; 
428   }
429
430   /* now find the next double separator - this defines the args to the database */
431   sqlargs=_find_next_separator_twice(libdbiargs,separator);
432   if (!sqlargs) {
433     rrd_set_error( "formatstring wrong for db arguments as we did not find \"%c%c\" in \"%s\"",separator,separator,libdbiargs);
434     return 1;
435   }
436
437   /* now we can start with the SQL Statement - best to start with this first, 
438      as then the error-handling is easier, as we do not have to handle libdbi shutdown as well */
439
440   /* parse the table(s) */
441   table_help.table_start=sqlargs;
442   nextptr=_find_next_separator(table_help.table_start,separator);
443   if (! nextptr) { 
444     /* error in argument */
445     rrd_set_error( "formatstring wrong - %s",tmpptr);
446     return -1; 
447   }
448   /* hex-unescape the value */
449   if(_inline_unescape(table_help.table_start)) { return -1; }
450
451   /* parse the unix timestamp column */
452   table_help.timestamp=nextptr;
453   nextptr=_find_next_separator(nextptr,separator);
454   if (! nextptr) { 
455     /* error in argument */
456     rrd_set_error( "formatstring wrong - %s",tmpptr);
457     return -1; 
458   }
459   /* if we have leading '*', then we have a TIMEDATE Field*/
460   if (table_help.timestamp[0]=='*') { isunixtime=0; table_help.timestamp++; }
461   /* hex-unescape the value */
462   if(_inline_unescape(table_help.timestamp)) { return -1; }
463
464   /* parse the value column */
465   table_help.value=nextptr;
466   nextptr=_find_next_separator(nextptr,separator);
467   if (! nextptr) { 
468     /* error in argument */
469     rrd_set_error( "formatstring wrong - %s",tmpptr);
470     return -1; 
471   }
472   /* hex-unescape the value */
473   if(_inline_unescape(table_help.value)) { return -1; }
474   
475   /* now prepare WHERE clause as empty string*/
476   where[0]=0;
477
478   /* and the where clause */
479   sqlargs=nextptr;
480   while(sqlargs) {
481     /* find next separator */
482     nextptr=_find_next_separator(sqlargs,separator);
483     /* now handle fields */
484     if (strcmp(sqlargs,"derive")==0) { /* the derive option with the default allowed max delta */
485       derive=600;
486     } else if (strcmp(sqlargs,"prediction")==0) {
487       rrd_set_error("argument prediction is no longer supported in a DEF - use new generic CDEF-functions instead");
488       return -1;
489     } else if (strcmp(sqlargs,"sigma")==0) {
490       rrd_set_error("argument sigma is no longer supported in a DEF - use new generic CDEF-functions instead");
491       return -1;
492     } else if (*sqlargs==0) { /* ignore empty */
493     } else { /* else add to where string */
494       if (where[0]) {strcat(where," AND ");}
495       strcat(where,sqlargs);
496     }
497     /* and continue loop with next pointer */
498     sqlargs=nextptr;
499   }
500   /* and unescape */
501   if(_inline_unescape(where)) { return -1; }
502
503   /* now parse LIBDBI options - this start initializing libdbi and beyond this point we need to reset the db as well in case of errors*/
504   while (libdbiargs) {
505     /* find separator */
506     nextptr=_find_next_separator(libdbiargs,separator);
507     /* now find =, separating key from value*/
508     tmpptr=_find_next_separator(libdbiargs,'=');
509     if (! tmpptr) { 
510       rrd_set_error( "formatstring wrong for db arguments as we did not find \"=\" in \"%s\"",libdbiargs);
511       _sql_close(&table_help);
512       return 1;
513     }
514     /* hex-unescape the value */
515     if(_inline_unescape(tmpptr)) { return -1; }
516     /* now handle the key/value pair */
517     if (strcmp(libdbiargs,"rrdminstepsize")==0) { /* allow override for minstepsize */
518       i=atoi(tmpptr);if (i>0) { minstepsize=i; }
519     } else if (strcmp(libdbiargs,"rrdfillmissing")==0) { /* allow override for minstepsize */
520       i=atoi(tmpptr);if (i>0) { fillmissing=i; }
521     } else if (strcmp(libdbiargs,"rrdderivemaxstep")==0) { /* allow override for derived max delta */
522       i=atoi(tmpptr);if (i>0) { if (derive) { derive=i; }}
523     } else { /* store in libdbi, as these are parameters */
524       if (_sql_setparam(&table_help,libdbiargs,tmpptr)) { return -1; }
525     }
526     /* and continue loop with next pointer */
527     libdbiargs=nextptr;
528   }
529   
530   /* and modify step if given */
531   if (*step<minstepsize) {*step=minstepsize;}
532   *start-=(*start)%(*step);
533   *end-=(*end)%(*step);
534
535   /* and append the SQL WHERE Clause for the timeframe calculated above (adding AND if required) */
536   if (where[0]) {strcat(where," AND ");}
537   i=strlen(where);
538   if (isunixtime) {
539     snprintf(where+i,sizeof(where)-1-i,"%li < %s AND %s < %li",*start,table_help.timestamp,table_help.timestamp,*end);
540   } else {
541     char tsstart[64];strftime(tsstart,sizeof(tsstart),"%Y-%m-%d %H:%M:%S",localtime(start));
542     char tsend[64];strftime(tsend,sizeof(tsend),"%Y-%m-%d %H:%M:%S",localtime(end));
543     snprintf(where+i,sizeof(where)-1-i,"'%s' < %s AND %s < '%s'",tsstart,table_help.timestamp,table_help.timestamp,tsend);
544   }
545
546   /* and now calculate the number of rows in the resultset... */
547   rows=((*end)-(*start))/(*step)+2;
548   
549   /* define the result set variables/columns returned */
550   *ds_cnt=5;
551   *ds_namv=(char**)malloc((*ds_cnt)*sizeof(char*));
552   for (i=0;i<(int)(*ds_cnt);i++) {
553     tmpptr=(char*)malloc(sizeof(char) * DS_NAM_SIZE);
554     (*ds_namv)[i]=tmpptr;
555     /* now copy what is required */
556     switch (i) {
557     case 0: strncpy(tmpptr,"min",DS_NAM_SIZE-1); break;
558     case 1: strncpy(tmpptr,"avg",DS_NAM_SIZE-1); break;
559     case 2: strncpy(tmpptr,"max",DS_NAM_SIZE-1); break;
560     case 3: strncpy(tmpptr,"count",DS_NAM_SIZE-1); break;
561     case 4: strncpy(tmpptr,"sigma",DS_NAM_SIZE-1); break;
562     }
563   }
564
565   /* allocate memory for resultset (with the following columns: min,avg,max,count,sigma) */
566   i=rows * sizeof(rrd_value_t)*(*ds_cnt);
567   if (((*data) = malloc(i))==NULL){
568     /* and return error */
569     rrd_set_error("malloc failed for %i bytes",i);
570     return(-1);
571   }
572   /* and fill with NAN */
573   for(i=0;i<rows;i++) {
574     (*data)[i*(*ds_cnt)+0]=DNAN; /* MIN */
575     (*data)[i*(*ds_cnt)+1]=DNAN; /* AVG */
576     (*data)[i*(*ds_cnt)+2]=DNAN; /* MAX */
577     (*data)[i*(*ds_cnt)+3]=0;    /* COUNT */
578     (*data)[i*(*ds_cnt)+4]=DNAN; /* SIGMA */
579   }
580   /* and assign undefined values for last - in case of derived calculation */
581   l_value=DNAN;l_timestamp=0;
582   /* here goes the real work processing all data */
583   while((r_status=_sql_fetchrow(&table_help,&r_timestamp,&r_value,derive))>0) {
584     /* processing of value */
585     /* calculate index for the timestamp */
586     idx=(r_timestamp-(*start))/(*step);
587     /* some out of bounds checks on idx */
588     if (idx<0) { idx=0;}
589     if (idx>rows) { idx=rows;}
590     /* and calculate derivative if necessary */
591     if (derive) {
592       /* calc deltas */
593       d_timestamp=r_timestamp-l_timestamp;
594       d_value=r_value-l_value;
595       /* assign current as last values */
596       l_timestamp=r_timestamp;
597       l_value=r_value;
598       /* assign DNAN by default for value */
599       r_value=DNAN;
600       /* check for timestamp delta to be within an acceptable range */
601       if ((d_timestamp>0)&&(d_timestamp<2*derive)) {
602         /* only handle positive delta - avoid wrap-arrounds/counter resets showing up as spikes */
603         if (d_value>0) {
604           /* and normalize to per second */
605           r_value=d_value/d_timestamp;
606         }
607       }
608     }
609     /* only add value if we have a value that is not NAN */
610     if (! isnan(r_value)) {
611       if ((*data)[idx*(*ds_cnt)+3]==0) { /* count is 0 so assign to overwrite DNAN */
612         (*data)[idx*(*ds_cnt)+0]=r_value; /* MIN */
613         (*data)[idx*(*ds_cnt)+1]=r_value; /* AVG */
614         (*data)[idx*(*ds_cnt)+2]=r_value; /* MAX */
615         (*data)[idx*(*ds_cnt)+3]=1;       /* COUNT */
616         (*data)[idx*(*ds_cnt)+4]=r_value; /* SIGMA */
617       } else {
618         /* MIN */
619         if ((*data)[idx*(*ds_cnt)+0]>r_value) { (*data)[idx*(*ds_cnt)+0]=r_value; }
620         /* AVG - at this moment still sum - corrected in post processing */
621         (*data)[idx*(*ds_cnt)+1]+=r_value;
622         /* MAX */
623         if ((*data)[idx*(*ds_cnt)+2]<r_value) { (*data)[idx*(*ds_cnt)+2]=r_value; }
624         /* COUNT */
625         (*data)[idx*(*ds_cnt)+3]++;
626         /* SIGMA - at this moment still sum of squares - corrected in post processing */
627         (*data)[idx*(*ds_cnt)+4]+=r_value*r_value;
628       }
629     }
630   }
631   /* and check for negativ status, pass back immediately */
632   if (r_status==-1) { return -1; }
633
634   /* post processing */
635   for(idx=0;idx<rows;idx++) {
636     long count=(*data)[idx*(*ds_cnt)+3];
637     if (count>0) {
638       /* calc deviation first */
639       if (count>2) {
640         r_value=count*(*data)[idx*(*ds_cnt)+4]-(*data)[idx*(*ds_cnt)+1]*(*data)[idx*(*ds_cnt)+1];
641         if (r_value<0) { 
642           r_value=DNAN; 
643         } else {
644           r_value=sqrt(r_value/(count*(count-1)));
645         }
646       }
647       (*data)[idx*(*ds_cnt)+4]=r_value;
648       /* now the average */
649       (*data)[idx*(*ds_cnt)+1]/=count;
650     }
651   }
652
653   /* and return OK */
654   return 0;
655 }