7cde3f10af8fd75c0c356250506f942bc8d56e39
[rrdtool.git] / src / rrd_fetch_libdbi.c
1 #include "rrd_tool.h"
2 #include <dbi/dbi.h>
3 #include <time.h>
4
5 /* the structures */
6 struct sql_table_helper {
7   dbi_conn conn;
8   int connected;
9   dbi_result result;
10   char const* filename;
11   char const* dbdriver;
12   char* table_start;
13   char* table_next;
14   char const* where;
15   char * timestamp;
16   char * value;
17 };
18
19 /* the prototypes */
20 int _sql_setparam(struct sql_table_helper* th,char* key, char* value);
21 int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value);
22 char* _find_next_separator(char* start,char separator);
23 char* _find_next_separator_twice(char*start,char separator);
24 char _hexcharhelper(char c);
25 int _inline_unescape (char* string);
26 double rrd_fetch_dbi_double(dbi_result *result,int idx);
27 long rrd_fetch_dbi_long(dbi_result *result,int idx);
28
29 /* the real code */
30
31 /* helpers to get correctly converted values from DB*/
32 long rrd_fetch_dbi_long(dbi_result *result,int idx) {
33   char *ptmp="";
34   switch (dbi_result_get_field_type_idx(result,idx)) {
35   case DBI_TYPE_STRING:
36     ptmp=(char*)dbi_result_get_string_idx(result,idx);
37     return atoi(ptmp);
38   default:
39     return dbi_result_get_longlong_idx(result,idx);
40   }
41 }
42
43 double rrd_fetch_dbi_double(dbi_result *result,int idx) {
44   char *ptmp="";
45   /* return NAN if NULL */
46   if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
47   /* do some conversions */
48   switch (dbi_result_get_field_type_idx(result,idx)) {
49   case DBI_TYPE_STRING:
50     ptmp=(char*)dbi_result_get_string_idx(result,idx);
51     return strtod(ptmp,NULL);
52   default:
53     return dbi_result_get_double_idx(result,idx);
54   }
55 }
56
57 int _sql_close(struct sql_table_helper* th) {
58   /* close only if connected */
59   if (th->conn) {
60     /* shutdown dbi */
61     dbi_conn_close(th->conn);
62     dbi_shutdown();
63     /* and assign empty */
64     th->conn=NULL;
65   }
66 }
67
68 int _sql_setparam(struct sql_table_helper* th,char* key, char* value) {
69   char* dbi_errstr=NULL;
70   /* if not connected */
71   if (! th->conn) {
72     /* initialize some stuff */
73     th->table_next=th->table_start;
74     th->result=NULL;
75     th->connected=0;
76     /* initialize db */
77     dbi_initialize(NULL);
78     th->conn=dbi_conn_new(th->dbdriver);
79     /* and handle errors */
80     if (! th->conn) { 
81       dbi_conn_error(th->conn,(const char**)&dbi_errstr);
82       rrd_set_error( "libdbi - no such driver: %s (possibly a dynamic link problem of the driver being linked without -ldbi)",dbi_errstr); 
83       dbi_shutdown();
84       return -1; 
85     }
86   }
87   if (th->connected) {
88     rrd_set_error( "we are already connected - can not set parameter %s=%s",key,value);
89     _sql_close(th);
90     return -1; 
91   }
92   if (dbi_conn_set_option(th->conn,key,value)) {
93     dbi_conn_error(th->conn,(const char**)&dbi_errstr);
94     rrd_set_error( "libdbi: problems setting %s to %s - %s",key,value,dbi_errstr);
95     _sql_close(th);
96     return -1;
97   }
98   return 0;
99 }
100
101 int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value) {
102   char* dbi_errstr=NULL;
103   char sql[10240];
104   time_t startt=0,endt=0;
105   /*connect to the database if needed */
106   if (! th->conn) {
107       rrd_set_error( "libdbi no parameters set for libdbi",th->filename,dbi_errstr);
108       return -1;
109   }
110   if (! th->connected) {
111     /* and now connect */
112     if (dbi_conn_connect(th->conn) <0) {
113       dbi_conn_error(th->conn,(const char**)&dbi_errstr);
114       rrd_set_error( "libdbi: problems connecting to db with connect string %s - error: %s",th->filename,dbi_errstr);
115       _sql_close(th);
116       return -1;
117     }
118     th->connected=1;
119   }
120   /* now find out regarding an existing result-set */
121   if (! th->result) {
122     /* return if table_next is NULL */
123     if (th->table_next==NULL) { 
124       /* but first close connection */
125       _sql_close(th);
126       /* and return with end of data */
127       return 0;
128     }
129     /* calculate the table to use next */
130     th->table_start=th->table_next;
131     th->table_next=_find_next_separator(th->table_start,'+');
132     _inline_unescape(th->table_start);
133     /* and prepare FULL SQL Statement */
134     snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s GROUP BY rrd_time",
135              th->timestamp,th->value,th->table_start,th->where);
136     /* and execute sql */
137     if (getenv("RRDDEBUGSQL")) { startt=time(NULL); fprintf(stderr,"RRDDEBUGSQL: %li: executing %s\n",startt,sql); }
138     th->result=dbi_conn_query(th->conn,sql);
139     if (startt) { endt=time(NULL);fprintf(stderr,"RRDDEBUGSQL: %li: timing %li\n",endt,endt-startt); }
140     /* handle error case */
141     if (! th->result) {
142       dbi_conn_error(th->conn,(const char**)&dbi_errstr);      
143       if (startt) { fprintf(stderr,"RRDDEBUGSQL: %li: error %s\n",endt,dbi_errstr); }
144       rrd_set_error("libdbi: problems with query: %s - errormessage: %s",sql,dbi_errstr);
145       _sql_close(th);
146       return -1;
147     }
148   }
149   /* and now fetch key and value */
150   if (! dbi_result_next_row(th->result)) {
151     /* free result */
152     dbi_result_free(th->result);
153     th->result=NULL;
154     /* and call recursively - this will open the next table or close connection as a whole*/
155     return _sql_fetchrow(th,timestamp,value);
156   } 
157   /* and return with flag for one value */
158   *timestamp=rrd_fetch_dbi_long(th->result,1);
159   *value=rrd_fetch_dbi_double(th->result,2);
160   return 1;
161 }
162
163 char* _find_next_separator(char* start,char separator) {
164   char* found=strchr(start,separator);
165   /* have we found it */
166   if (found) {
167     /* then 0 terminate current string */
168     *found=0; 
169     /* and return the pointer past the separator */
170     return (found+1);
171   }
172   /* not found, so return NULL */
173   return NULL;
174 }
175
176 char* _find_next_separator_twice(char*start,char separator) {
177   char *found=start;
178   /* find next separator in string*/
179   while (found) {
180     /* if found and the next one is also a separator */
181     if (found[1] == separator) {
182       /* then 0 terminate current string */
183       *found=0;
184       /* and return the pointer past the current one*/
185       return (found+2);
186     }
187     /* find next occurance */
188     found=strchr(found+1,separator);
189   }
190   /* not found, so return NULL */
191   return NULL;
192 }
193
194 char _hexcharhelper(char c) {
195   switch (c) {
196   case '0': return 0 ; break;
197   case '1': return 1 ; break;
198   case '2': return 2 ; break;
199   case '3': return 3 ; break;
200   case '4': return 4 ; break;
201   case '5': return 5 ; break;
202   case '6': return 6 ; break;
203   case '7': return 7 ; break;
204   case '8': return 8 ; break;
205   case '9': return 9 ; break;
206   case 'a': return 10 ; break;
207   case 'b': return 11 ; break;
208   case 'c': return 12 ; break;
209   case 'd': return 13 ; break;
210   case 'e': return 14 ; break;
211   case 'f': return 15 ; break;
212   case 'A': return 10 ; break;
213   case 'B': return 11 ; break;
214   case 'C': return 12 ; break;
215   case 'D': return 13 ; break;
216   case 'E': return 14 ; break;
217   case 'F': return 15 ; break;
218   }
219   return -1;
220 }
221
222 int _inline_unescape (char* string) {
223   char *src=string;
224   char *dst=string;
225   char c,h1,h2;
226   while((c= *src)) {
227     src++;
228     if (c == '%') {
229       if (*src == '%') { 
230         /* increase src pointer by 1 skiping second % */
231         src+=1;
232       } else {
233         /* try to calculate hex value from the next 2 values*/
234         h1=_hexcharhelper(*src);
235         if (h1<0) { rrd_set_error( "string escape error at: %s\n",string);return(1); }
236         h2=_hexcharhelper(*(src+1));
237         if (h2<0) { rrd_set_error( "string escape error at: %s\n",string);return(1); }
238         c=h2+(h1<<4);
239         /* increase src pointer by 2 skiping 2 chars */
240         src+=2;
241       } 
242     }
243     *dst=c;
244     dst++;
245   }
246   *dst=0;
247   return 0;
248 }
249
250 int
251 rrd_fetch_fn_libdbi(
252     char           *filename,  /* name of the rrd */
253     enum cf_en     cf_idx,     /* which consolidation function ?*/
254     time_t         *start,
255     time_t         *end,       /* which time frame do you want ?
256                                 * will be changed to represent reality */
257     unsigned long  *step,      /* which stepsize do you want? 
258                                 * will be changed to represent reality */
259     unsigned long  *ds_cnt,    /* number of data sources in file */
260     char           ***ds_namv, /* names of data_sources */
261     rrd_value_t    **data)     /* two dimensional array containing the data */
262 {
263   /* the separator used */
264   char separator='/';
265   /* a local copy of the filename - used for copying plus some pointer variables */
266   char filenameworkcopy[10240];
267   char *tmpptr=filenameworkcopy;
268   char *nextptr=NULL;
269   char *libdbiargs=NULL;
270   char *sqlargs=NULL;
271   /* the settings for the "works" of rrd */
272   int fillmissing=0;
273   unsigned long minstepsize=300;
274   /* the result-set */
275   long r_timestamp,l_timestamp,d_timestamp;
276   double r_value,l_value,d_value;
277   int r_status;
278   int rows;
279   long idx;
280   int derive=0;
281   /* the libdbi connection data and the table_help structure */
282   struct sql_table_helper table_help;
283   char where[10240];
284   table_help.conn=NULL;
285   table_help.where=where;
286
287   /* some loop variables */
288   int i=0;
289
290   /* check header */
291   if (strncmp("sql",filename,3)!=0) { 
292     rrd_set_error( "formatstring wrong - %s",filename );return -1; 
293   }
294   if (filename[3]!=filename[4]) { 
295     rrd_set_error( "formatstring wrong - %s",filename );return -1; 
296   }
297
298   /* now make this the separator */
299   separator=filename[3];
300
301   /* copy filename for local modifications during parsing */
302   strncpy(filenameworkcopy,filename+5,sizeof(filenameworkcopy));
303
304   /* get the driver */
305   table_help.dbdriver=tmpptr;
306   libdbiargs=_find_next_separator(tmpptr,separator);
307   if (! libdbiargs) { 
308     /* error in argument */
309     rrd_set_error( "formatstring wrong as we did not find \"%c\"- %s",separator,table_help.dbdriver);
310     return -1; 
311   }
312
313   /* now find the next double separator - this defines the args to the database */
314   sqlargs=_find_next_separator_twice(libdbiargs,separator);
315   if (!sqlargs) {
316     rrd_set_error( "formatstring wrong for db arguments as we did not find \"%c%c\" in \"%s\"",separator,separator,libdbiargs);
317     return 1;
318   }
319
320   /* now we can start with the SQL Statement - best to start with this first, 
321      as then the error-handling is easier, as we do not have to handle libdbi shutdown as well */
322
323   /* parse the table(s) */
324   table_help.table_start=sqlargs;
325   nextptr=_find_next_separator(table_help.table_start,separator);
326   if (! nextptr) { 
327     /* error in argument */
328     rrd_set_error( "formatstring wrong - %s",tmpptr);
329     return -1; 
330   }
331   /* hex-unescape the value */
332   if(_inline_unescape(table_help.table_start)) { return -1; }
333
334   /* parse the unix timestamp column */
335   table_help.timestamp=nextptr;
336   nextptr=_find_next_separator(nextptr,separator);
337   if (! nextptr) { 
338     /* error in argument */
339     rrd_set_error( "formatstring wrong - %s",tmpptr);
340     return -1; 
341   }
342   /* hex-unescape the value */
343   if(_inline_unescape(table_help.timestamp)) { return -1; }
344
345   /* parse the value column */
346   table_help.value=nextptr;
347   nextptr=_find_next_separator(nextptr,separator);
348   if (! nextptr) { 
349     /* error in argument */
350     rrd_set_error( "formatstring wrong - %s",tmpptr);
351     return -1; 
352   }
353   /* hex-unescape the value */
354   if(_inline_unescape(table_help.value)) { return -1; }
355   
356   /* now prepare WHERE clause as empty string*/
357   where[0]=0;
358
359   /* and the where clause */
360   sqlargs=nextptr;
361   while(sqlargs) {
362     /* find next separator */
363     nextptr=_find_next_separator(sqlargs,separator);
364     /* now handle fields */
365     if (strcmp(sqlargs,"derive")==0) { /* the derive option with the default allowed max delta */
366       derive=600;
367     } else if (strcmp(sqlargs,"prediction")==0) {
368       rrd_set_error("argument prediction is no longer supported in a VDEF - use new generic CDEF-functions instead");
369       return -1;
370     } else if (strcmp(sqlargs,"sigma")==0) {
371       rrd_set_error("argument sigma is no longer supported in a VDEF - use new generic CDEF-functions instead");
372       return -1;
373     } else if (*sqlargs==0) { /* ignore empty */
374     } else { /* else add to where string */
375       if (where[0]) {strcat(where," AND ");}
376       strcat(where,sqlargs);
377     }
378     /* and continue loop with next pointer */
379     sqlargs=nextptr;
380   }
381   /* and unescape */
382   if(_inline_unescape(where)) { return -1; }
383
384   /* now parse LIBDBI options - this start initializing libdbi and beyond this point we need to reset the db as well in case of errors*/
385   while (libdbiargs) {
386     /* find separator */
387     nextptr=_find_next_separator(libdbiargs,separator);
388     /* now find =, separating key from value*/
389     tmpptr=_find_next_separator(libdbiargs,'=');
390     if (! tmpptr) { 
391       rrd_set_error( "formatstring wrong for db arguments as we did not find \"=\" in \"%s\"",libdbiargs);
392       _sql_close(&table_help);
393       return 1;
394     }
395     /* hex-unescape the value */
396     if(_inline_unescape(tmpptr)) { return -1; }
397     /* now handle the key/value pair */
398     if (strcmp(libdbiargs,"rrdminstepsize")==0) { /* allow override for minstepsize */
399       i=atoi(tmpptr);if (i>0) { minstepsize=i; }
400     } else if (strcmp(libdbiargs,"rrdfillmissing")==0) { /* allow override for minstepsize */
401       i=atoi(tmpptr);if (i>0) { fillmissing=i; }
402     } else if (strcmp(libdbiargs,"rrdderivemaxstep")==0) { /* allow override for minstepsize */
403       i=atoi(tmpptr);if (i>0) { derive=i; }
404     } else { /* store in libdbi, as these are parameters */
405       if (_sql_setparam(&table_help,libdbiargs,tmpptr)) { 
406         _sql_close(&table_help);
407         return -1; 
408       }
409     }
410     /* and continue loop with next pointer */
411     libdbiargs=nextptr;
412   }
413   
414   /* and modify step if given */
415   if (*step<minstepsize) {*step=minstepsize;}
416   *start-=(*start)%(*step);
417   *end-=(*end)%(*step);
418
419   /* and append the SQL WHERE Clause for the timeframe calculated above (adding AND if required) */
420   if (where[0]) {strcat(where," AND ");}
421   i=strlen(where);
422   snprintf(where+i,sizeof(where)-1-i,"%li < %s AND %s < %li",*start,table_help.timestamp,table_help.timestamp,*end);
423
424   /* and now calculate the number of rows in the resultset... */
425   rows=((*end)-(*start))/(*step)+2;
426   
427   /* define the result set variables/columns returned */
428   *ds_cnt=5;
429   *ds_namv=(char**)malloc((*ds_cnt)*sizeof(char*));
430   for (i=0;i<(int)(*ds_cnt);i++) {
431     tmpptr=(char*)malloc(sizeof(char) * DS_NAM_SIZE);
432     (*ds_namv)[i]=tmpptr;
433     /* now copy what is required */
434     switch (i) {
435     case 0: strncpy(tmpptr,"min",DS_NAM_SIZE-1); break;
436     case 1: strncpy(tmpptr,"avg",DS_NAM_SIZE-1); break;
437     case 2: strncpy(tmpptr,"max",DS_NAM_SIZE-1); break;
438     case 3: strncpy(tmpptr,"count",DS_NAM_SIZE-1); break;
439     case 4: strncpy(tmpptr,"sigma",DS_NAM_SIZE-1); break;
440     }
441   }
442
443   /* allocate memory for resultset (with the following columns: min,avg,max,count,sigma) */
444   i=rows * sizeof(rrd_value_t)*(*ds_cnt);
445   if (((*data) = malloc(i))==NULL){
446     /* and return error */
447     rrd_set_error("malloc failed for %i bytes",i);
448     return(-1);
449   }
450   /* and fill with NAN */
451   for(i=0;i<rows;i++) {
452     (*data)[i*(*ds_cnt)+0]=DNAN; /* MIN */
453     (*data)[i*(*ds_cnt)+1]=DNAN; /* AVG */
454     (*data)[i*(*ds_cnt)+2]=DNAN; /* MAX */
455     (*data)[i*(*ds_cnt)+3]=0;    /* COUNT */
456     (*data)[i*(*ds_cnt)+4]=DNAN; /* SIGMA */
457   }
458   /* and assign undefined values for last - in case of derived calculation */
459   l_value=DNAN;l_timestamp=0;
460   /* here goes the real work processing all data */
461   while((r_status=_sql_fetchrow(&table_help,&r_timestamp,&r_value))>0) {
462     /* processing of value */
463     /* calculate index for the timestamp */
464     idx=(r_timestamp-(*start))/(*step);
465     /* some out of bounds checks on idx */
466     if (idx<0) { idx=0;}
467     if (idx>rows) { idx=rows;}
468     /* and calculate derivative if necessary */
469     if (derive) {
470       /* calc deltas */
471       d_timestamp=r_timestamp-l_timestamp;
472       d_value=r_value-l_value;
473       /* assign current as last values */
474       l_timestamp=r_timestamp;
475       l_value=r_value;
476       /* assign DNAN by default for value */
477       r_value=DNAN;
478       /* check for timestamp delta to be within an acceptable range */
479       if ((d_timestamp>0)&&(d_timestamp<2*derive)) {
480         /* only handle positive delta - avoid wrap-arrounds/counter resets showing up as spikes */
481         if (d_value>0) {
482           /* and normalize to per second */
483           r_value=d_value/d_timestamp;
484         }
485       }
486     }
487     /* only add value if we have a value that is not NAN */
488     if (! isnan(r_value)) {
489       if ((*data)[idx*(*ds_cnt)+3]==0) { /* count is 0 so assign to overwrite DNAN */
490         (*data)[idx*(*ds_cnt)+0]=r_value; /* MIN */
491         (*data)[idx*(*ds_cnt)+1]=r_value; /* AVG */
492         (*data)[idx*(*ds_cnt)+2]=r_value; /* MAX */
493         (*data)[idx*(*ds_cnt)+3]=1;       /* COUNT */
494         (*data)[idx*(*ds_cnt)+4]=r_value; /* SIGMA */
495       } else {
496         /* MIN */
497         if ((*data)[idx*(*ds_cnt)+0]>r_value) { (*data)[idx*(*ds_cnt)+0]=r_value; }
498         /* AVG - at this moment still sum - corrected in post processing */
499         (*data)[idx*(*ds_cnt)+1]+=r_value;
500         /* MAX */
501         if ((*data)[idx*(*ds_cnt)+2]<r_value) { (*data)[idx*(*ds_cnt)+2]=r_value; }
502         /* COUNT */
503         (*data)[idx*(*ds_cnt)+3]++;
504         /* SIGMA - at this moment still sum of squares - corrected in post processing */
505         (*data)[idx*(*ds_cnt)+4]+=r_value*r_value;
506       }
507     }
508   }
509   /* and check for negativ status, pass back immediately */
510   if (r_status==-1) { return -1; }
511
512   /* post processing */
513   for(idx=0;idx<rows;idx++) {
514     long count=(*data)[idx*(*ds_cnt)+3];
515     if (count>0) {
516       /* calc deviation first */
517       if (count>2) {
518         r_value=count*(*data)[idx*(*ds_cnt)+4]-(*data)[idx*(*ds_cnt)+1]*(*data)[idx*(*ds_cnt)+1];
519         if (r_value<0) { 
520           r_value=DNAN; 
521         } else {
522           r_value=sqrt(r_value/(count*(count-1)));
523         }
524       }
525       (*data)[idx*(*ds_cnt)+4]=r_value;
526       /* now the average */
527       (*data)[idx*(*ds_cnt)+1]/=count;
528     }
529   }
530
531   /* and return OK */
532   return 0;
533 }