OTS_TimeSeries: Add the AddDataPoint() method.
[otsdb-go.git] / ots_timeseries.go
1 package otsdb
2
3 import (
4   "fmt"
5   "math"
6   "os"
7   "sort"
8 )
9
10 type OTS_DataPoint struct {
11   TimeStamp float64
12   Rate      float64
13 }
14
15 type OTS_TimeSeries struct {
16   DataPoints []OTS_DataPoint
17 }
18
19 func timestampToInterval (a float64, b float64) float64 {
20   tmp := int (a / b)
21   return b * float64 (tmp)
22 }
23
24 /* Functions for the sort interface. */
25 func (obj *OTS_TimeSeries) Len () int {
26   return (len (obj.DataPoints))
27 }
28
29 func (obj *OTS_TimeSeries) Less (i, j int) bool {
30   if obj.DataPoints[i].TimeStamp < obj.DataPoints[j].TimeStamp {
31     return true
32   }
33   return false
34 }
35
36 func (obj *OTS_TimeSeries) Swap (i, j int) {
37   obj.DataPoints[i], obj.DataPoints[j] = obj.DataPoints[j], obj.DataPoints[i]
38 }
39
40 func (obj *OTS_TimeSeries) Write (name string) os.Error {
41   fd, err := os.OpenFile (name, os.O_WRONLY, 0666)
42   if err != nil {
43     return err
44   }
45
46   for i := 0; i < len (obj.DataPoints); i++ {
47     data_point := obj.DataPoints[i]
48     str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate)
49
50     fd.WriteString (str)
51   }
52
53   fd.Close ()
54   return nil
55 }
56
57 func (obj *OTS_TimeSeries) AddDataPoint (timestamp, rate float64) int {
58   if math.IsNaN (timestamp) || (timestamp < 0.0) {
59     return -1
60   }
61
62   /* Handle the usual case first. */
63   if (timestamp > obj.TimestampLast ()) || (obj.DataPoints == nil) {
64     obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate})
65     return 0
66   }
67
68   /* Find the first index where the timestamp is greater than or equal to the
69    * new timestamp. This is an O(log n) operation. */
70   index := sort.Search (len (obj.DataPoints), func (i int) bool {
71       if obj.DataPoints[i].TimeStamp >= timestamp {
72         return true
73       }
74       return false
75   })
76
77   /* Check for a duplicate time. */
78   if obj.DataPoints[index].TimeStamp == timestamp {
79     obj.DataPoints[index].Rate = rate
80     return 0
81   }
82
83   /* Insert the new datapoint at "index". Currently, this is a O(n) operation.
84    * First, add the new datapoint at the end. */
85   obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate})
86   /* Now move the datapoint to the position "index". */
87   for i := len (obj.DataPoints) - 2; i >= index; i-- {
88     /* TODO: Is there a faster way to manipulate arrays in Go than to move
89      * elements in bubblesort fashion? */
90     obj.Swap (i, i + 1)
91   }
92
93   return 0
94 }
95
96 func ReadFile (name string) (obj *OTS_TimeSeries, err os.Error) {
97   fd, err := os.Open (name)
98   if err != nil {
99     return nil, err
100   }
101
102   /* dp_list := make ([]OTS_DataPoint, intervals_num */
103   obj = new (OTS_TimeSeries)
104
105   for {
106     var timestamp float64
107     var rate float64
108
109     status, err := fmt.Fscanln (fd, "%f,%f", &timestamp, &rate)
110     if err != nil {
111       break
112     } else if status != 2 {
113       continue
114     }
115
116     fmt.Printf ("timestamp = %.3f; rate = %g;\n", timestamp, rate)
117
118     obj.AddDataPoint (timestamp, rate)
119   }
120
121   fd.Close ()
122   return obj, nil
123 }
124
125 func (obj *OTS_TimeSeries) ConsolidatePointAverage (ts_start, ts_end float64) OTS_DataPoint {
126   var dp OTS_DataPoint
127
128   if ts_start > ts_end {
129     tmp := ts_end
130     ts_end = ts_start
131     ts_start = tmp
132   }
133
134   dp.TimeStamp = ts_end
135   dp.Rate = math.NaN ()
136
137   if len (obj.DataPoints) < 1 {
138     /* The object contains no data. */
139     return dp
140   } else if ts_start > obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp {
141     /* The timespan is after all the data in the object. */
142     return dp
143   } else if ts_end < obj.DataPoints[0].TimeStamp {
144     /* The timespan is before all the data in the object. */
145     return dp
146   }
147
148   /* Find the first rate _after_ the start of the interval. */
149   idx_start := sort.Search (len (obj.DataPoints), func (i int) bool {
150       if obj.DataPoints[i].TimeStamp > ts_start {
151         return true
152       }
153       return false
154   })
155
156   /* The start is outside of the range of the timestamp. With the above checks
157    * this means that the start is _before_ the data in the object. We can thus
158    * use the first elements in the slice. */
159   if idx_start >= len (obj.DataPoints) {
160     idx_start = 0
161   }
162
163   /* There is no data points _between_ ts_start and ts_end. Return the first
164    * measured rate _after_ the desired timespan as the rate of the timespan. */
165   if obj.DataPoints[idx_start].TimeStamp >= ts_end {
166     dp.Rate = obj.DataPoints[idx_start].Rate
167     return dp
168   }
169
170   var timespan_len float64 = 0.0
171   var timespan_sum float64 = 0.0
172   for i := idx_start; i < len (obj.DataPoints); i++ {
173     dp_ts_start := ts_start
174     if (i > 0) && (dp_ts_start < obj.DataPoints[i - 1].TimeStamp) {
175       dp_ts_start = obj.DataPoints[i - 1].TimeStamp
176     }
177     
178     dp_ts_end := obj.DataPoints[i].TimeStamp
179     if dp_ts_end > ts_end {
180       dp_ts_end = ts_end
181     }
182
183     dp_ts_diff := dp_ts_end - dp_ts_start
184     /* assert dp_ts_diff > 0.0 */
185     timespan_len += dp_ts_diff
186     timespan_sum += dp_ts_diff * obj.DataPoints[i].Rate
187
188     if obj.DataPoints[i].TimeStamp >= ts_end {
189       break;
190     }
191   } /* for i */
192
193   dp.Rate = timespan_sum / timespan_len
194   return dp
195 } /* ConsolidatePointAverage */
196
197 func (obj *OTS_TimeSeries) TimestampFirst () float64 {
198   if obj.DataPoints == nil {
199     return math.NaN ()
200   }
201   return obj.DataPoints[0].TimeStamp
202 }
203
204 func (obj *OTS_TimeSeries) TimestampLast () float64 {
205   if obj.DataPoints == nil {
206     return math.NaN ()
207   }
208   return obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp
209 }
210
211 func (obj *OTS_TimeSeries) ConsolidateAverage (interval float64) *OTS_TimeSeries {
212   if interval <= 0.0 {
213     return nil
214   }
215
216   ts_raw_first := obj.DataPoints[0].TimeStamp
217   ts_raw_last  := obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp
218
219   fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n",
220       ts_raw_first, ts_raw_last)
221
222   /* Determine the timespan the consolidated data will span. */
223   ts_csl_first := timestampToInterval (ts_raw_first, interval)
224   ts_csl_last  := timestampToInterval (ts_raw_last,  interval)
225   if ts_csl_first < ts_raw_first {
226     ts_csl_first += interval
227   }
228
229   fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n",
230       ts_csl_first, ts_csl_last)
231
232   intervals_num := int ((ts_csl_last - ts_csl_first) / interval)
233   fmt.Printf ("Got a %gs timespan (%d intervals).\n",
234       ts_csl_last - ts_csl_first, intervals_num)
235
236   /* Allocate return structure */
237   ret_data := new (OTS_TimeSeries)
238   ret_data.DataPoints = make ([]OTS_DataPoint, intervals_num)
239
240   /* FIXME: This is currently a O(n log(n)) algorithm. It should instead be a O(n)
241    * algorithm. This is possible since obj is sorted. The problem is that
242    * ConsolidatePointAverage() does a binary search when we actually know where
243    * to go in the array. */
244   for i := 0; i < intervals_num; i++ {
245       ts := ts_csl_first + (float64 (i + 1) * interval)
246
247       fmt.Printf ("Building data for interval ]%g-%g].\n", ts - interval, ts)
248
249       ret_data.DataPoints[i] = obj.ConsolidatePointAverage (ts - interval, ts)
250   }
251
252   return ret_data
253 } /* ConsolidateAverage */
254
255 func (obj *OTS_TimeSeries) Print () {
256   for i := 0; i < len (obj.DataPoints); i++ {
257     data_point := obj.DataPoints[i]
258     fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate)
259   }
260 } /* Print () */
261
262 /* vim: set syntax=go sw=2 sts=2 et : */