Get the average consolidation working.
authorFlorian Forster <ff@octo.it>
Thu, 2 Jun 2011 16:37:31 +0000 (18:37 +0200)
committerFlorian Forster <ff@octo.it>
Thu, 2 Jun 2011 16:46:36 +0000 (18:46 +0200)
Also adds a unit test for the central working-horse function,
ConsolidatePointAverage().

Makefile [new file with mode: 0644]
compact.go [deleted file]
ots_timeseries.go [new file with mode: 0644]
ots_timeseries_test.go [new file with mode: 0644]

diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..3bf9a44
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,8 @@
+GOROOT = /usr/lib/go
+
+include $(GOROOT)/src/Make.inc
+
+TARG    = ots_timeseries
+GOFILES = ots_timeseries.go\
+
+include $(GOROOT)/src/Make.pkg
diff --git a/compact.go b/compact.go
deleted file mode 100644 (file)
index 0698f68..0000000
+++ /dev/null
@@ -1,161 +0,0 @@
-package main
-
-import (
-  "fmt"
-  "os"
-)
-
-type OTS_DataPoint struct {
-  TimeStamp float64
-  Rate      float64
-}
-
-type OTS_Data struct {
-  TSData []OTS_DataPoint
-}
-
-/* Functions for the sort interface. */
-func (obj *OTS_Data) Len () int {
-  return (len (obj.TSData))
-}
-
-func (obj *OTS_Data) Less (i, j int) bool {
-  if obj.TSData[i].TimeStamp < obj.TSData[j].TimeStamp {
-    return true
-  }
-  return false
-}
-
-func (obj *OTS_Data) Swap (i, j int) {
-  tmp := obj.TSData[i]
-  obj.TSData[i] = obj.TSData[j]
-  obj.TSData[j] = tmp
-}
-
-func Fmod64 (a float64, b float64) float64 {
-  tmp := int (a / b)
-  return b * float64 (tmp)
-}
-
-func (obj *OTS_Data) Write (name string) os.Error {
-  fd, err := os.OpenFile(name, os.O_WRONLY, 0666)
-  if err != nil {
-    return err
-  }
-
-  for i := 0; i < len (obj.TSData); i++ {
-    data_point := obj.TSData[i]
-    str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate)
-
-    fd.WriteString (str)
-  }
-
-  fd.Close ()
-  return nil
-}
-
-func ReadFile (name string) (obj *OTS_Data, err os.Error) {
-}
-
-func (raw_data *OTS_Data) Consolidate (interval float64) *OTS_Data {
-  if interval <= 0.0 {
-    return nil
-  }
-
-  ts_raw_first := raw_data.TSData[0].TimeStamp
-  ts_raw_last  := ts_raw_first
-
-  /* Determine the first and last data point.
-   * XXX: In the future, this should be a sorted list! */
-  for i := 1; i < len (raw_data.TSData); i++ {
-    data_point := raw_data.TSData[i]
-
-    if ts_raw_first > data_point.TimeStamp {
-      ts_raw_first = data_point.TimeStamp
-    }
-
-    if ts_raw_last < data_point.TimeStamp {
-      ts_raw_last = data_point.TimeStamp
-    }
-  }
-
-  fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n",
-      ts_raw_first, ts_raw_last)
-
-  /* Determine the timespan the consolidated data will span. */
-  ts_csl_first := Fmod64 (ts_raw_first, interval)
-  ts_csl_last  := Fmod64 (ts_raw_last,  interval)
-  if ts_csl_last < ts_raw_last {
-    ts_csl_last += interval
-  }
-
-  fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n",
-      ts_csl_first, ts_csl_last)
-
-  intervals_num := int ((ts_csl_last - ts_csl_first) / interval)
-  fmt.Printf ("Got a %gs timespan (%d intervals).\n",
-      ts_csl_last - ts_csl_first, intervals_num)
-
-  /* Allocate return structure */
-  ret_data := new (OTS_Data)
-  ret_data.TSData = make ([]OTS_DataPoint, intervals_num)
-
-  /* FIXME: This is currently a O(n^2) algorithm. It should instead be a O(n)
-   * algorithm. This is possible if raw_data is sorted (which, obviously, is a
-   * O(n log(n)) task). */
-  for i := 0; i < intervals_num; i++ {
-      ts := ts_csl_first + (float64 (i) * interval)
-      sum := 0.0
-      num := 0.0
-
-      fmt.Printf ("Building data for interval %g.\n", ts)
-
-      ret_data.TSData[i].TimeStamp = ts
-
-      for j := 0; j < len (raw_data.TSData); j++ {
-        data_point := raw_data.TSData[j]
-
-        if ((data_point.TimeStamp < ts) || (data_point.TimeStamp >= (ts + interval))) {
-          continue
-        }
-
-        sum += data_point.Rate
-        num += 1.0
-      }
-
-      /* TODO: Be more clever about how this consolidated rate is computed. */
-      if num > 0.0 {
-        ret_data.TSData[i].Rate = sum / num
-      }
-  }
-
-  return ret_data
-}
-
-func (obj *OTS_Data) Print () {
-  for i := 0; i < len (obj.TSData); i++ {
-    data_point := obj.TSData[i]
-    fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate)
-  }
-} /* Print () */
-
-func main () {
-  var data_points []OTS_DataPoint
-  var raw_data     *OTS_Data
-  var new_data     *OTS_Data
-
-  data_points = []OTS_DataPoint {
-    {0.0,  1.0},
-    {1.0,  2.0},
-    {2.0,  5.0},
-    {3.0,  8.0},
-    {4.0,  0.0},
-    {5.0,  3.0}}
-
-  raw_data = new (OTS_Data)
-  raw_data.TSData = data_points
-
-  new_data = raw_data.Consolidate (2.0)
-
-  new_data.Print()
-}
diff --git a/ots_timeseries.go b/ots_timeseries.go
new file mode 100644 (file)
index 0000000..efa8931
--- /dev/null
@@ -0,0 +1,211 @@
+package otsdb
+
+import (
+  "fmt"
+  "math"
+  "os"
+  "sort"
+)
+
+type OTS_DataPoint struct {
+  TimeStamp float64
+  Rate      float64
+}
+
+type OTS_TimeSeries struct {
+  DataPoints []OTS_DataPoint
+}
+
+/* Functions for the sort interface. */
+func (obj *OTS_TimeSeries) Len () int {
+  return (len (obj.DataPoints))
+}
+
+func (obj *OTS_TimeSeries) Less (i, j int) bool {
+  if obj.DataPoints[i].TimeStamp < obj.DataPoints[j].TimeStamp {
+    return true
+  }
+  return false
+}
+
+func (obj *OTS_TimeSeries) Swap (i, j int) {
+  tmp := obj.DataPoints[i]
+  obj.DataPoints[i] = obj.DataPoints[j]
+  obj.DataPoints[j] = tmp
+}
+
+func Fmod64 (a float64, b float64) float64 {
+  tmp := int (a / b)
+  return b * float64 (tmp)
+}
+
+func (obj *OTS_TimeSeries) Write (name string) os.Error {
+  fd, err := os.OpenFile (name, os.O_WRONLY, 0666)
+  if err != nil {
+    return err
+  }
+
+  for i := 0; i < len (obj.DataPoints); i++ {
+    data_point := obj.DataPoints[i]
+    str := fmt.Sprintf ("%.3f,%g\n", data_point.TimeStamp, data_point.Rate)
+
+    fd.WriteString (str)
+  }
+
+  fd.Close ()
+  return nil
+}
+
+func ReadFile (name string) (obj *OTS_TimeSeries, err os.Error) {
+  fd, err := os.Open (name)
+  if err != nil {
+    return nil, err
+  }
+
+  /* dp_list := make ([]OTS_DataPoint, intervals_num */
+  obj = new (OTS_TimeSeries)
+
+  for ;; {
+    var timestamp float64
+    var rate float64
+
+    status, err := fmt.Fscanln (fd, "%f,%f", &timestamp, &rate)
+    if err != nil {
+      break
+    } else if status != 2 {
+      continue
+    }
+
+    fmt.Printf ("timestamp = %.3f; rate = %g;\n", timestamp, rate)
+
+    obj.DataPoints = append (obj.DataPoints, OTS_DataPoint{timestamp, rate})
+  }
+
+  fd.Close ()
+  return obj, nil
+}
+
+func (obj *OTS_TimeSeries) ConsolidatePointAverage (ts_start, ts_end float64) OTS_DataPoint {
+  var dp OTS_DataPoint
+
+  if ts_start > ts_end {
+    tmp := ts_end
+    ts_end = ts_start
+    ts_start = tmp
+  }
+
+  dp.TimeStamp = ts_end
+  dp.Rate = math.NaN ()
+
+  if len (obj.DataPoints) < 1 {
+    /* The object contains no data. */
+    return dp
+  } else if ts_start > obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp {
+    /* The timespan is after all the data in the object. */
+    return dp
+  } else if ts_end < obj.DataPoints[0].TimeStamp {
+    /* The timespan is before all the data in the object. */
+    return dp
+  }
+
+  /* Find the first rate _after_ the start of the interval. */
+  idx_start := sort.Search (len (obj.DataPoints), func (i int) bool {
+      if obj.DataPoints[i].TimeStamp > ts_start {
+        return true
+      }
+      return false
+  })
+
+  /* The start is outside of the range of the timestamp. With the above checks
+   * this means that the start is _before_ the data in the object. We can thus
+   * use the first elements in the slice. */
+  if idx_start >= len (obj.DataPoints) {
+    idx_start = 0
+  }
+
+  /* There is no data points _between_ ts_start and ts_end. Return the first
+   * measured rate _after_ the desired timespan as the rate of the timespan. */
+  if obj.DataPoints[idx_start].TimeStamp >= ts_end {
+    dp.Rate = obj.DataPoints[idx_start].Rate
+    return dp
+  }
+
+  var timespan_len float64 = 0.0
+  var timespan_sum float64 = 0.0
+  for i := idx_start; i < len (obj.DataPoints); i++ {
+    dp_ts_start := ts_start
+    if (i > 0) && (dp_ts_start < obj.DataPoints[i - 1].TimeStamp) {
+      dp_ts_start = obj.DataPoints[i - 1].TimeStamp
+    }
+    
+    dp_ts_end := obj.DataPoints[i].TimeStamp
+    if dp_ts_end > ts_end {
+      dp_ts_end = ts_end
+    }
+
+    dp_ts_diff := dp_ts_end - dp_ts_start
+    /* assert dp_ts_diff > 0.0 */
+    timespan_len += dp_ts_diff
+    timespan_sum += dp_ts_diff * obj.DataPoints[i].Rate
+
+    if obj.DataPoints[i].TimeStamp >= ts_end {
+      break;
+    }
+  } /* for i */
+
+  dp.Rate = timespan_sum / timespan_len
+  return dp
+} /* ConsolidatePointAverage */
+
+func (obj *OTS_TimeSeries) ConsolidateAverage (interval float64) *OTS_TimeSeries {
+  if interval <= 0.0 {
+    return nil
+  }
+
+  ts_raw_first := obj.DataPoints[0].TimeStamp
+  ts_raw_last  := obj.DataPoints[len (obj.DataPoints) - 1].TimeStamp
+
+  fmt.Printf ("ts_raw_first = %g; ts_raw_last = %g;\n",
+      ts_raw_first, ts_raw_last)
+
+  /* Determine the timespan the consolidated data will span. */
+  ts_csl_first := Fmod64 (ts_raw_first, interval)
+  ts_csl_last  := Fmod64 (ts_raw_last,  interval)
+  if ts_csl_first < ts_raw_first {
+    ts_csl_first += interval
+  }
+
+  fmt.Printf ("ts_csl_first = %g; ts_csl_last = %g;\n",
+      ts_csl_first, ts_csl_last)
+
+  intervals_num := int ((ts_csl_last - ts_csl_first) / interval)
+  fmt.Printf ("Got a %gs timespan (%d intervals).\n",
+      ts_csl_last - ts_csl_first, intervals_num)
+
+  /* Allocate return structure */
+  ret_data := new (OTS_TimeSeries)
+  ret_data.DataPoints = make ([]OTS_DataPoint, intervals_num)
+
+  /* FIXME: This is currently a O(n log(n)) algorithm. It should instead be a O(n)
+   * algorithm. This is possible since obj is sorted. The problem is that
+   * ConsolidatePointAverage() does a binary search when we actually know where
+   * to go in the array. */
+  for i := 0; i < intervals_num; i++ {
+      ts := ts_csl_first + (float64 (i + 1) * interval)
+
+      fmt.Printf ("Building data for interval ]%g-%g].\n", ts - interval, ts)
+
+      ret_data.DataPoints[i] = obj.ConsolidatePointAverage (ts - interval, ts)
+  }
+
+  return ret_data
+} /* ConsolidateAverage */
+
+func (obj *OTS_TimeSeries) Print () {
+  for i := 0; i < len (obj.DataPoints); i++ {
+    data_point := obj.DataPoints[i]
+    fmt.Printf ("[%g] %g\n", data_point.TimeStamp, data_point.Rate)
+  }
+} /* Print () */
+
+/* vim: set syntax=go sw=2 sts=2 et : */
diff --git a/ots_timeseries_test.go b/ots_timeseries_test.go
new file mode 100644 (file)
index 0000000..8df850e
--- /dev/null
@@ -0,0 +1,70 @@
+package otsdb
+
+import (
+  "math"
+  "testing"
+)
+
+type consolidatePointAverageTest struct {
+  tsStart float64
+  tsEnd   float64
+  rate    float64
+}
+
+var consolidatePointAverageTestData = []OTS_DataPoint {
+  {  0.0,  0.0},
+  { 10.0,  2.0},
+  { 20.0,  4.0},
+  { 30.0,  8.0},
+  { 40.0, 16.0},
+  { 50.0, 32.0},
+  { 60.0, 64.0},
+  { 70.0, 96.0},
+  { 80.0, 96.0},
+  { 90.0,  0.0},
+  {100.0,  0.0},
+  {110.0,  0.0},
+  {120.0,  0.0},
+}
+
+var consolidatePointAverageTests = []consolidatePointAverageTest {
+  /* Timespan borders align with datapoints. This is the easiest case. */
+  consolidatePointAverageTest{40.0,  60.0, 48.0},
+  consolidatePointAverageTest{40.0,  50.0, 32.0},
+  /* Timespan borders between datapoints. */
+  consolidatePointAverageTest{35.0,  45.0, 24.0},
+  consolidatePointAverageTest{ 7.0,  27.0,  5.1},
+  consolidatePointAverageTest{17.0,  42.0, 12.64},
+  /* No datapoints within timespan. */
+  consolidatePointAverageTest{23.0,  28.0,  8.0},
+  /* Beginning before first datapoint */
+  consolidatePointAverageTest{-8.0,  24.0,  2.875},
+  /* End after last datapoint */
+  consolidatePointAverageTest{60.0, 180.0, 32.0},
+  /* Start and end inversed */
+  consolidatePointAverageTest{27.0,   7.0,  5.1},
+}
+
+func FloatsDiffer (a, b float64) bool {
+  if math.Fabs (a - b) > 1.0e-6 {
+    return true
+  }
+  return false
+}
+
+func TestConsolidatePointAverage (t *testing.T) {
+  obj := new (OTS_TimeSeries)
+  obj.DataPoints = consolidatePointAverageTestData
+
+  for i := 0; i < len (consolidatePointAverageTests); i++ {
+    testCase := consolidatePointAverageTests[i]
+
+    dp := obj.ConsolidatePointAverage (testCase.tsStart, testCase.tsEnd);
+    if FloatsDiffer (dp.Rate, testCase.rate) {
+      t.Errorf ("ConsolidatePointAverage (%g, %g) failed: Expected %g, got %g",
+          testCase.tsStart, testCase.tsEnd, testCase.rate, dp.Rate);
+    }
+  }
+}
+
+/* vim: set syntax=go sw=2 sts=2 et : */