package otsdb import ( "fmt" "math" "os" "sort" ) type OTS_DataPoint struct { TimeStamp float64 Rate float64 } type OTS_TimeSeries struct { DataPoints []OTS_DataPoint } func timestampToInterval (a float64, b float64) float64 { tmp := int (a / b) return b * float64 (tmp) } /* Functions for the sort interface. */ func (obj *OTS_TimeSeries) Len () int { return (len (obj.DataPoints)) } func (obj *OTS_TimeSeries) Less (i, j int) bool { if obj.DataPoints[i].TimeStamp < obj.DataPoints[j].TimeStamp { return true } return false } func (obj *OTS_TimeSeries) Swap (i, j int) { obj.DataPoints[i], obj.DataPoints[j] = obj.DataPoints[j], obj.DataPoints[i] } func (obj *OTS_TimeSeries) Write (name string) os.Error { fd, err := os.OpenFile (name, os.O_WRONLY, 0666) if err != nil { return err } for i := 0; i < len (obj.DataPoints); i++ { data_point := obj.DataPoints[i] str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate) fd.WriteString (str) } fd.Close () return nil } func (obj *OTS_TimeSeries) AddDataPoint (timestamp, rate float64) int { if math.IsNaN (timestamp) || (timestamp < 0.0) { return -1 } /* Handle the usual case first. */ if (timestamp > obj.TimestampLast ()) || (obj.DataPoints == nil) { obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate}) return 0 } /* Find the first index where the timestamp is greater than or equal to the * new timestamp. This is an O(log n) operation. */ index := sort.Search (len (obj.DataPoints), func (i int) bool { if obj.DataPoints[i].TimeStamp >= timestamp { return true } return false }) /* Check for a duplicate time. */ if obj.DataPoints[index].TimeStamp == timestamp { obj.DataPoints[index].Rate = rate return 0 } /* Insert the new datapoint at "index". Currently, this is a O(n) operation. * First, add the new datapoint at the end. */ obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate}) /* Now move the datapoint to the position "index". */ for i := len (obj.DataPoints) - 2; i >= index; i-- { /* TODO: Is there a faster way to manipulate arrays in Go than to move * elements in bubblesort fashion? */ obj.Swap (i, i + 1) } return 0 } func ReadFile (name string) (obj *OTS_TimeSeries, err os.Error) { fd, err := os.Open (name) if err != nil { return nil, err } /* dp_list := make ([]OTS_DataPoint, intervals_num */ obj = new (OTS_TimeSeries) for { var timestamp float64 var rate float64 status, err := fmt.Fscanln (fd, "%f,%f", ×tamp, &rate) if err != nil { break } else if status != 2 { continue } fmt.Printf ("timestamp = %.3f; rate = %g;\n", timestamp, rate) obj.AddDataPoint (timestamp, rate) } fd.Close () return obj, nil } func (obj *OTS_TimeSeries) ConsolidatePointAverage (ts_start, ts_end float64) OTS_DataPoint { var dp OTS_DataPoint if ts_start > ts_end { tmp := ts_end ts_end = ts_start ts_start = tmp } dp.TimeStamp = ts_end dp.Rate = math.NaN () if len (obj.DataPoints) < 1 { /* The object contains no data. */ return dp } else if ts_start > obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp { /* The timespan is after all the data in the object. */ return dp } else if ts_end < obj.DataPoints[0].TimeStamp { /* The timespan is before all the data in the object. */ return dp } /* Find the first rate _after_ the start of the interval. */ idx_start := sort.Search (len (obj.DataPoints), func (i int) bool { if obj.DataPoints[i].TimeStamp > ts_start { return true } return false }) /* The start is outside of the range of the timestamp. With the above checks * this means that the start is _before_ the data in the object. We can thus * use the first elements in the slice. */ if idx_start >= len (obj.DataPoints) { idx_start = 0 } /* There is no data points _between_ ts_start and ts_end. Return the first * measured rate _after_ the desired timespan as the rate of the timespan. */ if obj.DataPoints[idx_start].TimeStamp >= ts_end { dp.Rate = obj.DataPoints[idx_start].Rate return dp } var timespan_len float64 = 0.0 var timespan_sum float64 = 0.0 for i := idx_start; i < len (obj.DataPoints); i++ { dp_ts_start := ts_start if (i > 0) && (dp_ts_start < obj.DataPoints[i - 1].TimeStamp) { dp_ts_start = obj.DataPoints[i - 1].TimeStamp } dp_ts_end := obj.DataPoints[i].TimeStamp if dp_ts_end > ts_end { dp_ts_end = ts_end } dp_ts_diff := dp_ts_end - dp_ts_start /* assert dp_ts_diff > 0.0 */ timespan_len += dp_ts_diff timespan_sum += dp_ts_diff * obj.DataPoints[i].Rate if obj.DataPoints[i].TimeStamp >= ts_end { break; } } /* for i */ dp.Rate = timespan_sum / timespan_len return dp } /* ConsolidatePointAverage */ func (obj *OTS_TimeSeries) TimestampFirst () float64 { if obj.DataPoints == nil { return math.NaN () } return obj.DataPoints[0].TimeStamp } func (obj *OTS_TimeSeries) TimestampLast () float64 { if obj.DataPoints == nil { return math.NaN () } return obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp } func (obj *OTS_TimeSeries) ConsolidateAverage (interval float64) *OTS_TimeSeries { if interval <= 0.0 { return nil } ts_raw_first := obj.DataPoints[0].TimeStamp ts_raw_last := obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n", ts_raw_first, ts_raw_last) /* Determine the timespan the consolidated data will span. */ ts_csl_first := timestampToInterval (ts_raw_first, interval) ts_csl_last := timestampToInterval (ts_raw_last, interval) if ts_csl_first < ts_raw_first { ts_csl_first += interval } fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n", ts_csl_first, ts_csl_last) intervals_num := int ((ts_csl_last - ts_csl_first) / interval) fmt.Printf ("Got a %gs timespan (%d intervals).\n", ts_csl_last - ts_csl_first, intervals_num) /* Allocate return structure */ ret_data := new (OTS_TimeSeries) ret_data.DataPoints = make ([]OTS_DataPoint, intervals_num) /* FIXME: This is currently a O(n log(n)) algorithm. It should instead be a O(n) * algorithm. This is possible since obj is sorted. The problem is that * ConsolidatePointAverage() does a binary search when we actually know where * to go in the array. */ for i := 0; i < intervals_num; i++ { ts := ts_csl_first + (float64 (i + 1) * interval) fmt.Printf ("Building data for interval ]%g-%g].\n", ts - interval, ts) ret_data.DataPoints[i] = obj.ConsolidatePointAverage (ts - interval, ts) } return ret_data } /* ConsolidateAverage */ func (obj *OTS_TimeSeries) Print () { for i := 0; i < len (obj.DataPoints); i++ { data_point := obj.DataPoints[i] fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate) } } /* Print () */ /* vim: set syntax=go sw=2 sts=2 et : */