From c440adb8c05c78b612296b8f08f0ef7b727eb5e7 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 2 Jun 2011 18:37:31 +0200 Subject: [PATCH] Get the average consolidation working. Also adds a unit test for the central working-horse function, ConsolidatePointAverage(). --- Makefile | 8 ++ compact.go | 161 ------------------------------------- ots_timeseries.go | 211 +++++++++++++++++++++++++++++++++++++++++++++++++ ots_timeseries_test.go | 70 ++++++++++++++++ 4 files changed, 289 insertions(+), 161 deletions(-) create mode 100644 Makefile delete mode 100644 compact.go create mode 100644 ots_timeseries.go create mode 100644 ots_timeseries_test.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3bf9a44 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +GOROOT = /usr/lib/go + +include $(GOROOT)/src/Make.inc + +TARG = ots_timeseries +GOFILES = ots_timeseries.go\ + +include $(GOROOT)/src/Make.pkg diff --git a/compact.go b/compact.go deleted file mode 100644 index 0698f68..0000000 --- a/compact.go +++ /dev/null @@ -1,161 +0,0 @@ -package main - -import ( - "fmt" - "os" -) - -type OTS_DataPoint struct { - TimeStamp float64 - Rate float64 -} - -type OTS_Data struct { - TSData []OTS_DataPoint -} - -/* Functions for the sort interface. */ -func (obj *OTS_Data) Len () int { - return (len (obj.TSData)) -} - -func (obj *OTS_Data) Less (i, j int) bool { - if obj.TSData[i].TimeStamp < obj.TSData[j].TimeStamp { - return true - } - return false -} - -func (obj *OTS_Data) Swap (i, j int) { - tmp := obj.TSData[i] - obj.TSData[i] = obj.TSData[j] - obj.TSData[j] = tmp -} - -func Fmod64 (a float64, b float64) float64 { - tmp := int (a / b) - return b * float64 (tmp) -} - -func (obj *OTS_Data) Write (name string) os.Error { - fd, err := os.OpenFile(name, os.O_WRONLY, 0666) - if err != nil { - return err - } - - for i := 0; i < len (obj.TSData); i++ { - data_point := obj.TSData[i] - str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate) - - fd.WriteString (str) - } - - fd.Close () - return nil -} - -func ReadFile (name string) (obj *OTS_Data, err os.Error) { -} - -func (raw_data *OTS_Data) Consolidate (interval float64) *OTS_Data { - if interval <= 0.0 { - return nil - } - - ts_raw_first := raw_data.TSData[0].TimeStamp - ts_raw_last := ts_raw_first - - /* Determine the first and last data point. - * XXX: In the future, this should be a sorted list! */ - for i := 1; i < len (raw_data.TSData); i++ { - data_point := raw_data.TSData[i] - - if ts_raw_first > data_point.TimeStamp { - ts_raw_first = data_point.TimeStamp - } - - if ts_raw_last < data_point.TimeStamp { - ts_raw_last = data_point.TimeStamp - } - } - - fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n", - ts_raw_first, ts_raw_last) - - /* Determine the timespan the consolidated data will span. */ - ts_csl_first := Fmod64 (ts_raw_first, interval) - ts_csl_last := Fmod64 (ts_raw_last, interval) - if ts_csl_last < ts_raw_last { - ts_csl_last += interval - } - - fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n", - ts_csl_first, ts_csl_last) - - intervals_num := int ((ts_csl_last - ts_csl_first) / interval) - fmt.Printf ("Got a %gs timespan (%d intervals).\n", - ts_csl_last - ts_csl_first, intervals_num) - - /* Allocate return structure */ - ret_data := new (OTS_Data) - ret_data.TSData = make ([]OTS_DataPoint, intervals_num) - - /* FIXME: This is currently a O(n^2) algorithm. It should instead be a O(n) - * algorithm. This is possible if raw_data is sorted (which, obviously, is a - * O(n log(n)) task). */ - for i := 0; i < intervals_num; i++ { - ts := ts_csl_first + (float64 (i) * interval) - sum := 0.0 - num := 0.0 - - fmt.Printf ("Building data for interval %g.\n", ts) - - ret_data.TSData[i].TimeStamp = ts - - for j := 0; j < len (raw_data.TSData); j++ { - data_point := raw_data.TSData[j] - - if ((data_point.TimeStamp < ts) || (data_point.TimeStamp >= (ts + interval))) { - continue - } - - sum += data_point.Rate - num += 1.0 - } - - /* TODO: Be more clever about how this consolidated rate is computed. */ - if num > 0.0 { - ret_data.TSData[i].Rate = sum / num - } - } - - return ret_data -} - -func (obj *OTS_Data) Print () { - for i := 0; i < len (obj.TSData); i++ { - data_point := obj.TSData[i] - fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate) - } -} /* Print () */ - -func main () { - var data_points []OTS_DataPoint - var raw_data *OTS_Data - var new_data *OTS_Data - - data_points = []OTS_DataPoint { - {0.0, 1.0}, - {1.0, 2.0}, - {2.0, 5.0}, - {3.0, 8.0}, - {4.0, 0.0}, - {5.0, 3.0}} - - raw_data = new (OTS_Data) - raw_data.TSData = data_points - - new_data = raw_data.Consolidate (2.0) - - new_data.Print() -} diff --git a/ots_timeseries.go b/ots_timeseries.go new file mode 100644 index 0000000..efa8931 --- /dev/null +++ b/ots_timeseries.go @@ -0,0 +1,211 @@ +package otsdb + +import ( + "fmt" + "math" + "os" + "sort" +) + +type OTS_DataPoint struct { + TimeStamp float64 + Rate float64 +} + +type OTS_TimeSeries struct { + DataPoints []OTS_DataPoint +} + +/* Functions for the sort interface. */ +func (obj *OTS_TimeSeries) Len () int { + return (len (obj.DataPoints)) +} + +func (obj *OTS_TimeSeries) Less (i, j int) bool { + if obj.DataPoints[i].TimeStamp < obj.DataPoints[j].TimeStamp { + return true + } + return false +} + +func (obj *OTS_TimeSeries) Swap (i, j int) { + tmp := obj.DataPoints[i] + obj.DataPoints[i] = obj.DataPoints[j] + obj.DataPoints[j] = tmp +} + +func Fmod64 (a float64, b float64) float64 { + tmp := int (a / b) + return b * float64 (tmp) +} + +func (obj *OTS_TimeSeries) Write (name string) os.Error { + fd, err := os.OpenFile (name, os.O_WRONLY, 0666) + if err != nil { + return err + } + + for i := 0; i < len (obj.DataPoints); i++ { + data_point := obj.DataPoints[i] + str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate) + + fd.WriteString (str) + } + + fd.Close () + return nil +} + +func ReadFile (name string) (obj *OTS_TimeSeries, err os.Error) { + fd, err := os.Open (name) + if err != nil { + return nil, err + } + + /* dp_list := make ([]OTS_DataPoint, intervals_num */ + obj = new (OTS_TimeSeries) + + for ;; { + var timestamp float64 + var rate float64 + + status, err := fmt.Fscanln (fd, "%f,%f", ×tamp, &rate) + if err != nil { + break + } else if status != 2 { + continue + } + + fmt.Printf ("timestamp = %.3f; rate = %g;\n", timestamp, rate) + + obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate}) + } + + fd.Close () + return obj, nil +} + +func (obj *OTS_TimeSeries) ConsolidatePointAverage (ts_start, ts_end float64) OTS_DataPoint { + var dp OTS_DataPoint + + if ts_start > ts_end { + tmp := ts_end + ts_end = ts_start + ts_start = tmp + } + + dp.TimeStamp = ts_end + dp.Rate = math.NaN () + + if len (obj.DataPoints) < 1 { + /* The object contains no data. */ + return dp + } else if ts_start > obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp { + /* The timespan is after all the data in the object. */ + return dp + } else if ts_end < obj.DataPoints[0].TimeStamp { + /* The timespan is before all the data in the object. */ + return dp + } + + /* Find the first rate _after_ the start of the interval. */ + idx_start := sort.Search (len (obj.DataPoints), func (i int) bool { + if obj.DataPoints[i].TimeStamp > ts_start { + return true + } + return false + }) + + /* The start is outside of the range of the timestamp. With the above checks + * this means that the start is _before_ the data in the object. We can thus + * use the first elements in the slice. */ + if idx_start >= len (obj.DataPoints) { + idx_start = 0 + } + + /* There is no data points _between_ ts_start and ts_end. Return the first + * measured rate _after_ the desired timespan as the rate of the timespan. */ + if obj.DataPoints[idx_start].TimeStamp >= ts_end { + dp.Rate = obj.DataPoints[idx_start].Rate + return dp + } + + var timespan_len float64 = 0.0 + var timespan_sum float64 = 0.0 + for i := idx_start; i < len (obj.DataPoints); i++ { + dp_ts_start := ts_start + if (i > 0) && (dp_ts_start < obj.DataPoints[i - 1].TimeStamp) { + dp_ts_start = obj.DataPoints[i - 1].TimeStamp + } + + dp_ts_end := obj.DataPoints[i].TimeStamp + if dp_ts_end > ts_end { + dp_ts_end = ts_end + } + + dp_ts_diff := dp_ts_end - dp_ts_start + /* assert dp_ts_diff > 0.0 */ + timespan_len += dp_ts_diff + timespan_sum += dp_ts_diff * obj.DataPoints[i].Rate + + if obj.DataPoints[i].TimeStamp >= ts_end { + break; + } + } /* for i */ + + dp.Rate = timespan_sum / timespan_len + return dp +} /* ConsolidatePointAverage */ + +func (obj *OTS_TimeSeries) ConsolidateAverage (interval float64) *OTS_TimeSeries { + if interval <= 0.0 { + return nil + } + + ts_raw_first := obj.DataPoints[0].TimeStamp + ts_raw_last := obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp + + fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n", + ts_raw_first, ts_raw_last) + + /* Determine the timespan the consolidated data will span. */ + ts_csl_first := Fmod64 (ts_raw_first, interval) + ts_csl_last := Fmod64 (ts_raw_last, interval) + if ts_csl_first < ts_raw_first { + ts_csl_first += interval + } + + fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n", + ts_csl_first, ts_csl_last) + + intervals_num := int ((ts_csl_last - ts_csl_first) / interval) + fmt.Printf ("Got a %gs timespan (%d intervals).\n", + ts_csl_last - ts_csl_first, intervals_num) + + /* Allocate return structure */ + ret_data := new (OTS_TimeSeries) + ret_data.DataPoints = make ([]OTS_DataPoint, intervals_num) + + /* FIXME: This is currently a O(n log(n)) algorithm. It should instead be a O(n) + * algorithm. This is possible since obj is sorted. The problem is that + * ConsolidatePointAverage() does a binary search when we actually know where + * to go in the array. */ + for i := 0; i < intervals_num; i++ { + ts := ts_csl_first + (float64 (i + 1) * interval) + + fmt.Printf ("Building data for interval ]%g-%g].\n", ts - interval, ts) + + ret_data.DataPoints[i] = obj.ConsolidatePointAverage (ts - interval, ts) + } + + return ret_data +} /* ConsolidateAverage */ + +func (obj *OTS_TimeSeries) Print () { + for i := 0; i < len (obj.DataPoints); i++ { + data_point := obj.DataPoints[i] + fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate) + } +} /* Print () */ + +/* vim: set syntax=go sw=2 sts=2 et : */ diff --git a/ots_timeseries_test.go b/ots_timeseries_test.go new file mode 100644 index 0000000..8df850e --- /dev/null +++ b/ots_timeseries_test.go @@ -0,0 +1,70 @@ +package otsdb + +import ( + "math" + "testing" +) + +type consolidatePointAverageTest struct { + tsStart float64 + tsEnd float64 + rate float64 +} + +var consolidatePointAverageTestData = []OTS_DataPoint { + { 0.0, 0.0}, + { 10.0, 2.0}, + { 20.0, 4.0}, + { 30.0, 8.0}, + { 40.0, 16.0}, + { 50.0, 32.0}, + { 60.0, 64.0}, + { 70.0, 96.0}, + { 80.0, 96.0}, + { 90.0, 0.0}, + {100.0, 0.0}, + {110.0, 0.0}, + {120.0, 0.0}, +} + +var consolidatePointAverageTests = []consolidatePointAverageTest { + /* Timespan borders align with datapoints. This is the easiest case. */ + consolidatePointAverageTest{40.0, 60.0, 48.0}, + consolidatePointAverageTest{40.0, 50.0, 32.0}, + /* Timespan borders between datapoints. */ + consolidatePointAverageTest{35.0, 45.0, 24.0}, + consolidatePointAverageTest{ 7.0, 27.0, 5.1}, + consolidatePointAverageTest{17.0, 42.0, 12.64}, + /* No datapoints within timespan. */ + consolidatePointAverageTest{23.0, 28.0, 8.0}, + /* Beginning before first datapoint */ + consolidatePointAverageTest{-8.0, 24.0, 2.875}, + /* End after last datapoint */ + consolidatePointAverageTest{60.0, 180.0, 32.0}, + /* Start and end inversed */ + consolidatePointAverageTest{27.0, 7.0, 5.1}, +} + +func FloatsDiffer (a, b float64) bool { + if math.Fabs (a - b) > 1.0e-6 { + return true + } + return false +} + +func TestConsolidatePointAverage (t *testing.T) { + obj := new (OTS_TimeSeries) + obj.DataPoints = consolidatePointAverageTestData + + for i := 0; i < len (consolidatePointAverageTests); i++ { + testCase := consolidatePointAverageTests[i] + + dp := obj.ConsolidatePointAverage (testCase.tsStart, testCase.tsEnd); + if FloatsDiffer (dp.Rate, testCase.rate) { + t.Errorf ("ConsolidatePointAverage (%g, %g) failed: Expected %g, got %g", + testCase.tsStart, testCase.tsEnd, testCase.rate, dp.Rate); + } + } +} + +/* vim: set syntax=go sw=2 sts=2 et : */ -- 2.11.0