package kraftakt import ( "context" "encoding/json" "fmt" "html/template" "io/ioutil" "net/http" "sync" "time" "github.com/octo/kraftakt/app" "github.com/octo/kraftakt/fitbit" "github.com/octo/kraftakt/gfit" "google.golang.org/appengine" "google.golang.org/appengine/datastore" "google.golang.org/appengine/delay" "google.golang.org/appengine/log" "google.golang.org/appengine/user" ) var delayedHandleNotifications = delay.Func("handleNotifications", handleNotifications) var templates *template.Template func init() { http.Handle("/login", AuthenticatedHandler(loginHandler)) http.Handle("/fitbit/connect", AuthenticatedHandler(fitbitConnectHandler)) http.Handle("/fitbit/grant", AuthenticatedHandler(fitbitGrantHandler)) http.Handle("/fitbit/disconnect", AuthenticatedHandler(fitbitDisconnectHandler)) http.Handle("/google/connect", AuthenticatedHandler(googleConnectHandler)) http.Handle("/google/grant", AuthenticatedHandler(googleGrantHandler)) http.Handle("/google/disconnect", AuthenticatedHandler(googleDisconnectHandler)) // unauthenticated http.Handle("/fitbit/notify", ContextHandler(fitbitNotifyHandler)) http.Handle("/", ContextHandler(indexHandler)) t, err := template.ParseGlob("templates/*.html") if err != nil { panic(err) } templates = t } func internalServerError(ctx context.Context, w http.ResponseWriter, err error) { log.Errorf(ctx, "%v", err) http.Error(w, "Internal Server Error\n\nReference: "+appengine.RequestID(ctx), http.StatusInternalServerError) } // ContextHandler implements http.Handler type ContextHandler func(context.Context, http.ResponseWriter, *http.Request) error func (hndl ContextHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := appengine.NewContext(r) if err := app.LoadConfig(ctx); err != nil { internalServerError(ctx, w, fmt.Errorf("LoadConfig() = %v", err)) return } if err := hndl(ctx, w, r); err != nil { internalServerError(ctx, w, err) return } } type AuthenticatedHandler func(context.Context, http.ResponseWriter, *http.Request, *app.User) error func (hndl AuthenticatedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := appengine.NewContext(r) if err := app.LoadConfig(ctx); err != nil { internalServerError(ctx, w, fmt.Errorf("LoadConfig() = %v", err)) return } gaeUser := user.Current(ctx) if gaeUser == nil { url, err := user.LoginURL(ctx, r.URL.String()) if err != nil { internalServerError(ctx, w, fmt.Errorf("LoginURL() = %v", err)) return } http.Redirect(w, r, url, http.StatusTemporaryRedirect) return } u, err := app.NewUser(ctx, gaeUser.Email) if err != nil { internalServerError(ctx, w, fmt.Errorf("NewUser(%q) = %v", gaeUser.Email, err)) return } if err := hndl(ctx, w, r, u); err != nil { internalServerError(ctx, w, err) return } } func indexHandler(ctx context.Context, w http.ResponseWriter, _ *http.Request) error { var templateData struct { HaveFitbit bool HaveGoogleFit bool *app.User } templateName := "main.html" if gaeUser := user.Current(ctx); gaeUser != nil { templateName = "loggedin.html" u, err := app.NewUser(ctx, gaeUser.Email) if err != nil { return err } templateData.User = u _, err = u.Token(ctx, "Fitbit") if err != nil && err != datastore.ErrNoSuchEntity { return err } templateData.HaveFitbit = (err == nil) _, err = u.Token(ctx, "Google") if err != nil && err != datastore.ErrNoSuchEntity { return err } templateData.HaveGoogleFit = (err == nil) } return templates.ExecuteTemplate(w, templateName, &templateData) } func loginHandler(_ context.Context, w http.ResponseWriter, r *http.Request, _ *app.User) error { // essentially a nop; all the heavy lifting (i.e. logging in) has been done by the AuthenticatedHandler wrapper. redirectURL := r.URL redirectURL.Path = "/" redirectURL.RawQuery = "" redirectURL.Fragment = "" http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) return nil } func fitbitConnectHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error { http.Redirect(w, r, fitbit.AuthURL(ctx, u), http.StatusTemporaryRedirect) return nil } func fitbitGrantHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error { if err := fitbit.ParseToken(ctx, r, u); err != nil { return err } c, err := fitbit.NewClient(ctx, "", u) if err != nil { return err } for _, collection := range []string{"activities", "sleep"} { if err := c.Subscribe(ctx, collection); err != nil { return fmt.Errorf("c.Subscribe(%q) = %v", collection, err) } log.Infof(ctx, "Successfully subscribed to %q", collection) } redirectURL := r.URL redirectURL.Path = "/" redirectURL.RawQuery = "" redirectURL.Fragment = "" http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) return nil } func fitbitDisconnectHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error { c, err := fitbit.NewClient(ctx, "", u) if err != nil { return err } if err := c.UnsubscribeAll(ctx); err != nil { return fmt.Errorf("UnsubscribeAll() = %v", err) } if err := c.DeleteToken(ctx); err != nil { return err } redirectURL := r.URL redirectURL.Path = "/" redirectURL.RawQuery = "" redirectURL.Fragment = "" http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) return nil } func googleConnectHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error { http.Redirect(w, r, gfit.AuthURL(ctx, u), http.StatusTemporaryRedirect) return nil } func googleGrantHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error { if err := gfit.ParseToken(ctx, r, u); err != nil { return err } redirectURL := r.URL redirectURL.Path = "/" redirectURL.RawQuery = "" redirectURL.Fragment = "" http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) return nil } func googleDisconnectHandler(ctx context.Context, w http.ResponseWriter, r *http.Request, u *app.User) error { c, err := gfit.NewClient(ctx, u) if err != nil { return err } if err := c.DeleteToken(ctx); err != nil { return err } redirectURL := r.URL redirectURL.Path = "/" redirectURL.RawQuery = "" redirectURL.Fragment = "" http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) return nil } // fitbitNotifyHandler is called by Fitbit whenever there are updates to a // subscription. It verifies the payload, splits it into individual // notifications and adds it to the taskqueue service. func fitbitNotifyHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() fitbitTimeout := 3 * time.Second ctx, cancel := context.WithTimeout(ctx, fitbitTimeout) defer cancel() // this is used when setting up a new subscriber in the UI. Once set // up, this code path should not be triggered. if verify := r.FormValue("verify"); verify != "" { if verify == app.Config.FitbitSubscriberCode { w.WriteHeader(http.StatusNoContent) } else { w.WriteHeader(http.StatusNotFound) } return nil } data, err := ioutil.ReadAll(r.Body) if err != nil { return err } // Fitbit recommendation: "If signature verification fails, you should // respond with a 404" if !fitbit.CheckSignature(ctx, data, r.Header.Get("X-Fitbit-Signature")) { log.Errorf(ctx, "signature mismatch") w.WriteHeader(http.StatusNotFound) return nil } if err := delayedHandleNotifications.Call(ctx, data); err != nil { return err } w.WriteHeader(http.StatusCreated) return nil } // handleNotifications parses fitbit notifications and requests the individual // activities from Fitbit. It is executed asynchronously via the delay package. func handleNotifications(ctx context.Context, payload []byte) error { log.Debugf(ctx, "NOTIFY -> %s", payload) if err := app.LoadConfig(ctx); err != nil { return err } var subscriptions []fitbit.Subscription if err := json.Unmarshal(payload, &subscriptions); err != nil { return err } wg := &sync.WaitGroup{} for _, s := range subscriptions { switch s.CollectionType { case "activities": wg.Add(1) go func(s fitbit.Subscription) { defer wg.Done() if err := activitiesNotification(ctx, &s); err != nil { log.Warningf(ctx, "activitiesNotification() = %v", err) } }(s) // copies s case "sleep": wg.Add(1) go func(s fitbit.Subscription) { defer wg.Done() if err := sleepNotification(ctx, &s); err != nil { log.Warningf(ctx, "sleepNotification() = %v", err) } }(s) // copies s default: log.Warningf(ctx, "ignoring collection type %q", s.CollectionType) } } wg.Wait() return nil } func activitiesNotification(ctx context.Context, s *fitbit.Subscription) error { u, err := fitbit.UserFromSubscriberID(ctx, s.SubscriptionID) if err != nil { return err } fitbitClient, err := fitbit.NewClient(ctx, s.OwnerID, u) if err != nil { return err } var ( wg = &sync.WaitGroup{} errs appengine.MultiError summary *fitbit.ActivitySummary profile *fitbit.Profile ) wg.Add(1) go func() { var err error summary, err = fitbitClient.ActivitySummary(ctx, s.Date) if err != nil { errs = append(errs, fmt.Errorf("fitbitClient.ActivitySummary(%q) = %v", s.Date, err)) } wg.Done() }() wg.Add(1) go func() { var err error profile, err = fitbitClient.Profile(ctx) if err != nil { errs = append(errs, fmt.Errorf("fitbitClient.Profile(%q) = %v", s.Date, err)) } wg.Done() }() wg.Wait() if len(errs) != 0 { return errs } tm, err := time.ParseInLocation("2006-01-02", s.Date, profile.Timezone) if err != nil { return err } log.Debugf(ctx, "%s (%s) took %d steps on %s", profile.Name, u.Email, summary.Summary.Steps, tm) gfitClient, err := gfit.NewClient(ctx, u) if err != nil { return err } wg.Add(1) go func() { if err := gfitClient.SetSteps(ctx, summary.Summary.Steps, tm); err != nil { errs = append(errs, fmt.Errorf("gfitClient.SetSteps(%d) = %v", summary.Summary.Steps, err)) } wg.Done() }() wg.Add(1) go func() { if err := gfitClient.SetCalories(ctx, summary.Summary.CaloriesOut, tm); err != nil { errs = append(errs, fmt.Errorf("gfitClient.SetCalories(%d) = %v", summary.Summary.CaloriesOut, err)) } wg.Done() }() wg.Add(1) go func() { defer wg.Done() var distanceMeters float64 for _, d := range summary.Summary.Distances { if d.Activity != "total" { continue } distanceMeters = 1000.0 * d.Distance break } if err := gfitClient.SetDistance(ctx, distanceMeters, tm); err != nil { errs = append(errs, fmt.Errorf("gfitClient.SetDistance(%g) = %v", distanceMeters, err)) return } }() wg.Add(1) go func() { if err := gfitClient.SetHeartRate(ctx, summary.Summary.HeartRateZones, summary.Summary.RestingHeartRate, tm); err != nil { errs = append(errs, fmt.Errorf("gfitClient.SetHeartRate() = %v", err)) } wg.Done() }() wg.Add(1) go func() { defer wg.Done() var activities []gfit.Activity for _, a := range summary.Activities { if !a.HasStartTime { continue } startTime, err := time.ParseInLocation("2006-01-02T15:04", a.StartDate+"T"+a.StartTime, profile.Timezone) if err != nil { errs = append(errs, fmt.Errorf("gfitClient.SetActivities() = %v", err)) return } endTime := startTime.Add(time.Duration(a.Duration) * time.Millisecond) activities = append(activities, gfit.Activity{ Start: startTime, End: endTime, Type: a.Name, }) } if err := gfitClient.SetActivities(ctx, activities, tm); err != nil { errs = append(errs, fmt.Errorf("gfitClient.SetActivities() = %v", err)) return } }() wg.Wait() if len(errs) != 0 { return errs } return nil } func sleepNotification(ctx context.Context, s *fitbit.Subscription) error { u, err := fitbit.UserFromSubscriberID(ctx, s.SubscriptionID) if err != nil { return err } var ( wg = &sync.WaitGroup{} gfitClient *gfit.Client gfitErr error ) wg.Add(1) go func() { gfitClient, gfitErr = gfit.NewClient(ctx, u) wg.Done() }() fitbitClient, err := fitbit.NewClient(ctx, s.OwnerID, u) if err != nil { return err } profile, err := fitbitClient.Profile(ctx) if err != nil { return err } tm, err := time.ParseInLocation("2006-01-02", s.Date, profile.Timezone) if err != nil { return err } sleep, err := fitbitClient.Sleep(ctx, tm) if err != nil { return err } log.Debugf(ctx, "fitbitClient.Sleep(%v) returned %d sleep stages", tm, len(sleep.Stages)) var activities []gfit.Activity for _, stg := range sleep.Stages { a := gfit.Activity{ Start: stg.StartTime, End: stg.EndTime, } switch stg.Level { case fitbit.SleepLevelDeep: a.Type = "Deep sleep" case fitbit.SleepLevelLight: a.Type = "Light sleep" case fitbit.SleepLevelREM: a.Type = "REM sleep" case fitbit.SleepLevelWake: a.Type = "Awake (during sleep cycle)" default: log.Warningf(ctx, "unexpected sleep level %v", stg.Level) continue } activities = append(activities, a) } wg.Wait() if gfitErr != nil { return gfitErr } log.Debugf(ctx, "passing %d activities to gfitClient.SetActivities()", len(activities)) if err := gfitClient.SetActivities(ctx, activities, tm); err != nil { return fmt.Errorf("SetActivities() = %v", err) } return nil }