Get the average consolidation working.
[otsdb-go.git] / ots_timeseries.go
1 package otsdb
2
3 import (
4   "fmt"
5   "math"
6   "os"
7   "sort"
8 )
9
10 type OTS_DataPoint struct {
11   TimeStamp float64
12   Rate      float64
13 }
14
15 type OTS_TimeSeries struct {
16   DataPoints []OTS_DataPoint
17 }
18
19 /* Functions for the sort interface. */
20 func (obj *OTS_TimeSeries) Len () int {
21   return (len (obj.DataPoints))
22 }
23
24 func (obj *OTS_TimeSeries) Less (i, j int) bool {
25   if obj.DataPoints[i].TimeStamp < obj.DataPoints[j].TimeStamp {
26     return true
27   }
28   return false
29 }
30
31 func (obj *OTS_TimeSeries) Swap (i, j int) {
32   tmp := obj.DataPoints[i]
33   obj.DataPoints[i] = obj.DataPoints[j]
34   obj.DataPoints[j] = tmp
35 }
36
37 func Fmod64 (a float64, b float64) float64 {
38   tmp := int (a / b)
39   return b * float64 (tmp)
40 }
41
42 func (obj *OTS_TimeSeries) Write (name string) os.Error {
43   fd, err := os.OpenFile (name, os.O_WRONLY, 0666)
44   if err != nil {
45     return err
46   }
47
48   for i := 0; i < len (obj.DataPoints); i++ {
49     data_point := obj.DataPoints[i]
50     str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate)
51
52     fd.WriteString (str)
53   }
54
55   fd.Close ()
56   return nil
57 }
58
59 func ReadFile (name string) (obj *OTS_TimeSeries, err os.Error) {
60   fd, err := os.Open (name)
61   if err != nil {
62     return nil, err
63   }
64
65   /* dp_list := make ([]OTS_DataPoint, intervals_num */
66   obj = new (OTS_TimeSeries)
67
68   for ;; {
69     var timestamp float64
70     var rate float64
71
72     status, err := fmt.Fscanln (fd, "%f,%f", &timestamp, &rate)
73     if err != nil {
74       break
75     } else if status != 2 {
76       continue
77     }
78
79     fmt.Printf ("timestamp = %.3f; rate = %g;\n", timestamp, rate)
80
81     obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate})
82   }
83
84   fd.Close ()
85   return obj, nil
86 }
87
88 func (obj *OTS_TimeSeries) ConsolidatePointAverage (ts_start, ts_end float64) OTS_DataPoint {
89   var dp OTS_DataPoint
90
91   if ts_start > ts_end {
92     tmp := ts_end
93     ts_end = ts_start
94     ts_start = tmp
95   }
96
97   dp.TimeStamp = ts_end
98   dp.Rate = math.NaN ()
99
100   if len (obj.DataPoints) < 1 {
101     /* The object contains no data. */
102     return dp
103   } else if ts_start > obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp {
104     /* The timespan is after all the data in the object. */
105     return dp
106   } else if ts_end < obj.DataPoints[0].TimeStamp {
107     /* The timespan is before all the data in the object. */
108     return dp
109   }
110
111   /* Find the first rate _after_ the start of the interval. */
112   idx_start := sort.Search (len (obj.DataPoints), func (i int) bool {
113       if obj.DataPoints[i].TimeStamp > ts_start {
114         return true
115       }
116       return false
117   })
118
119   /* The start is outside of the range of the timestamp. With the above checks
120    * this means that the start is _before_ the data in the object. We can thus
121    * use the first elements in the slice. */
122   if idx_start >= len (obj.DataPoints) {
123     idx_start = 0
124   }
125
126   /* There is no data points _between_ ts_start and ts_end. Return the first
127    * measured rate _after_ the desired timespan as the rate of the timespan. */
128   if obj.DataPoints[idx_start].TimeStamp >= ts_end {
129     dp.Rate = obj.DataPoints[idx_start].Rate
130     return dp
131   }
132
133   var timespan_len float64 = 0.0
134   var timespan_sum float64 = 0.0
135   for i := idx_start; i < len (obj.DataPoints); i++ {
136     dp_ts_start := ts_start
137     if (i > 0) && (dp_ts_start < obj.DataPoints[i - 1].TimeStamp) {
138       dp_ts_start = obj.DataPoints[i - 1].TimeStamp
139     }
140     
141     dp_ts_end := obj.DataPoints[i].TimeStamp
142     if dp_ts_end > ts_end {
143       dp_ts_end = ts_end
144     }
145
146     dp_ts_diff := dp_ts_end - dp_ts_start
147     /* assert dp_ts_diff > 0.0 */
148     timespan_len += dp_ts_diff
149     timespan_sum += dp_ts_diff * obj.DataPoints[i].Rate
150
151     if obj.DataPoints[i].TimeStamp >= ts_end {
152       break;
153     }
154   } /* for i */
155
156   dp.Rate = timespan_sum / timespan_len
157   return dp
158 } /* ConsolidatePointAverage */
159
160 func (obj *OTS_TimeSeries) ConsolidateAverage (interval float64) *OTS_TimeSeries {
161   if interval <= 0.0 {
162     return nil
163   }
164
165   ts_raw_first := obj.DataPoints[0].TimeStamp
166   ts_raw_last  := obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp
167
168   fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n",
169       ts_raw_first, ts_raw_last)
170
171   /* Determine the timespan the consolidated data will span. */
172   ts_csl_first := Fmod64 (ts_raw_first, interval)
173   ts_csl_last  := Fmod64 (ts_raw_last,  interval)
174   if ts_csl_first < ts_raw_first {
175     ts_csl_first += interval
176   }
177
178   fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n",
179       ts_csl_first, ts_csl_last)
180
181   intervals_num := int ((ts_csl_last - ts_csl_first) / interval)
182   fmt.Printf ("Got a %gs timespan (%d intervals).\n",
183       ts_csl_last - ts_csl_first, intervals_num)
184
185   /* Allocate return structure */
186   ret_data := new (OTS_TimeSeries)
187   ret_data.DataPoints = make ([]OTS_DataPoint, intervals_num)
188
189   /* FIXME: This is currently a O(n log(n)) algorithm. It should instead be a O(n)
190    * algorithm. This is possible since obj is sorted. The problem is that
191    * ConsolidatePointAverage() does a binary search when we actually know where
192    * to go in the array. */
193   for i := 0; i < intervals_num; i++ {
194       ts := ts_csl_first + (float64 (i + 1) * interval)
195
196       fmt.Printf ("Building data for interval ]%g-%g].\n", ts - interval, ts)
197
198       ret_data.DataPoints[i] = obj.ConsolidatePointAverage (ts - interval, ts)
199   }
200
201   return ret_data
202 } /* ConsolidateAverage */
203
204 func (obj *OTS_TimeSeries) Print () {
205   for i := 0; i < len (obj.DataPoints); i++ {
206     data_point := obj.DataPoints[i]
207     fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate)
208   }
209 } /* Print () */
210
211 /* vim: set syntax=go sw=2 sts=2 et : */