add ristretto caching and process daily and rolling data
This commit is contained in:
parent
6f4090fcb3
commit
145028af37
@ -8,3 +8,10 @@ require (
|
|||||||
github.com/cenkalti/backoff/v5 v5.0.3
|
github.com/cenkalti/backoff/v5 v5.0.3
|
||||||
pkg v0.0.0-00010101000000-000000000000
|
pkg v0.0.0-00010101000000-000000000000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dgraph-io/ristretto/v2 v2.3.0 // indirect
|
||||||
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
|
golang.org/x/sys v0.35.0 // indirect
|
||||||
|
)
|
||||||
|
|||||||
@ -1,2 +1,10 @@
|
|||||||
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
|
github.com/dgraph-io/ristretto/v2 v2.3.0 h1:qTQ38m7oIyd4GAed/QkUZyPFNMnvVWyazGXRwvOt5zk=
|
||||||
|
github.com/dgraph-io/ristretto/v2 v2.3.0/go.mod h1:gpoRV3VzrEY1a9dWAYV6T1U7YzfgttXdd/ZzL1s9OZM=
|
||||||
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
|
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||||
|
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
|
|||||||
@ -2,9 +2,19 @@ package meteo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MeteoDataFromServiceA struct {
|
||||||
|
Timestamp time.Time `csv:"fecha" json:"timestamp"`
|
||||||
|
Location string `csv:"ciudad" json:"location"`
|
||||||
|
MaxTemp float32 `csv:"temperatura maxima" json:"max_temp"`
|
||||||
|
MinTemp float32 `csv:"temperatura minima" json:"min_temp"`
|
||||||
|
Rainfall float32 `csv:"precipitacion" json:"rainfall"`
|
||||||
|
Cloudiness int `csv:"nubosidad" json:"cloudiness"`
|
||||||
|
}
|
||||||
|
|
||||||
type MeteoDataPerDay struct {
|
type MeteoDataPerDay struct {
|
||||||
MaxTemp float32 `json:"max_temp"`
|
MaxTemp float32 `json:"max_temp"`
|
||||||
MinTemp float32 `json:"min_temp"`
|
MinTemp float32 `json:"min_temp"`
|
||||||
@ -33,6 +43,10 @@ type MeteoData struct {
|
|||||||
Rolling7 *Rolling7Data `json:"rolling7,omitempty"`
|
Rolling7 *Rolling7Data `json:"rolling7,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mt *MeteoData) ComputeCacheKey() string {
|
||||||
|
return fmt.Sprintf("meteo:%s:%s", mt.Location, mt.From)
|
||||||
|
}
|
||||||
|
|
||||||
type Unit string
|
type Unit string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@ -2,33 +2,42 @@ package meteo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v5"
|
"github.com/cenkalti/backoff/v5"
|
||||||
|
"github.com/dgraph-io/ristretto/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type inMemory struct {
|
|
||||||
data any
|
|
||||||
mu *sync.RWMutex
|
|
||||||
expiry time.Time
|
|
||||||
}
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
inMemory
|
cache *ristretto.Cache[string, string]
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService() *Service {
|
func NewService() *Service {
|
||||||
return &Service{}
|
cache, err := ristretto.NewCache(&ristretto.Config[string, string]{
|
||||||
|
NumCounters: 1024,
|
||||||
|
MaxCost: 1 << 30,
|
||||||
|
BufferItems: 64,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("cannot init cache", "err", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Service{
|
||||||
|
cache: cache,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetWeatherByCity(ctx context.Context, params GetMeteoData) ([]MeteoData, error) {
|
func (s *Service) GetWeatherByCity(ctx context.Context, params GetMeteoData) (MeteoData, error) {
|
||||||
fromDate, err := time.Parse("2006-01-02", params.Date)
|
fromDate, err := time.Parse("2006-01-02", params.Date)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return MeteoData{}, err
|
||||||
}
|
}
|
||||||
toDate := fromDate.AddDate(0, 0, params.Days-1)
|
toDate := fromDate.AddDate(0, 0, params.Days-1)
|
||||||
|
|
||||||
@ -36,13 +45,15 @@ func (s *Service) GetWeatherByCity(ctx context.Context, params GetMeteoData) ([]
|
|||||||
url := fmt.Sprintf("http://localhost:8080/data?city=%s&from=%s&to=%s",
|
url := fmt.Sprintf("http://localhost:8080/data?city=%s&from=%s&to=%s",
|
||||||
params.Location, params.Date, toDate.Format("2006-01-02"))
|
params.Location, params.Date, toDate.Format("2006-01-02"))
|
||||||
|
|
||||||
|
slog.Info("url", "url", url)
|
||||||
|
|
||||||
resp, err := http.Get(url)
|
resp, err := http.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusBadRequest {
|
if resp.StatusCode == http.StatusBadRequest {
|
||||||
|
resp.Body.Close()
|
||||||
return nil, backoff.Permanent(errors.New("bad request"))
|
return nil, backoff.Permanent(errors.New("bad request"))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,11 +63,92 @@ func (s *Service) GetWeatherByCity(ctx context.Context, params GetMeteoData) ([]
|
|||||||
result, err := backoff.Retry(ctx, operation, backoff.WithBackOff(backoff.NewExponentialBackOff()))
|
result, err := backoff.Retry(ctx, operation, backoff.WithBackOff(backoff.NewExponentialBackOff()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("somethin happened")
|
slog.Error("somethin happened")
|
||||||
return []MeteoData{}, err
|
return MeteoData{}, err
|
||||||
}
|
}
|
||||||
// TODO add ristretto
|
defer result.Body.Close()
|
||||||
|
|
||||||
slog.Info("fetched data", "data", result)
|
body, err := io.ReadAll(result.Body)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("error reading response body", "err", err)
|
||||||
|
return MeteoData{}, err
|
||||||
|
}
|
||||||
|
|
||||||
return []MeteoData{}, nil
|
var serviceAResponse struct {
|
||||||
|
MeteoData []MeteoDataFromServiceA `json:"meteo_data"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &serviceAResponse); err != nil {
|
||||||
|
slog.Error("error unmarshaling response", "err", err)
|
||||||
|
return MeteoData{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(serviceAResponse.MeteoData) == 0 {
|
||||||
|
return MeteoData{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.Agg == AggDaily {
|
||||||
|
return s.processDailyData(serviceAResponse.MeteoData, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.processRolling7Data(serviceAResponse.MeteoData, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) processDailyData(data []MeteoDataFromServiceA, params GetMeteoData) (MeteoData, error) {
|
||||||
|
days := make([]MeteoDataPerDay, 0, len(data))
|
||||||
|
|
||||||
|
for _, d := range data {
|
||||||
|
avgTemp := (d.MaxTemp + d.MinTemp) / 2
|
||||||
|
day := MeteoDataPerDay{
|
||||||
|
MaxTemp: d.MaxTemp,
|
||||||
|
MinTemp: d.MinTemp,
|
||||||
|
AvgTemp: avgTemp,
|
||||||
|
Rainfall: d.Rainfall,
|
||||||
|
Cloudiness: d.Cloudiness,
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.Unit == UnitF {
|
||||||
|
day.ConvertValue()
|
||||||
|
}
|
||||||
|
|
||||||
|
days = append(days, day)
|
||||||
|
}
|
||||||
|
|
||||||
|
return MeteoData{
|
||||||
|
Location: params.Location,
|
||||||
|
Unit: params.Unit,
|
||||||
|
From: params.Date,
|
||||||
|
Days: &days,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) processRolling7Data(data []MeteoDataFromServiceA, params GetMeteoData) (MeteoData, error) {
|
||||||
|
if len(data) < 7 {
|
||||||
|
return MeteoData{}, errors.New("insufficient data for rolling 7-day calculation")
|
||||||
|
}
|
||||||
|
|
||||||
|
var sumTemp, sumRainfall float32
|
||||||
|
var sumCloudiness int
|
||||||
|
|
||||||
|
for i := len(data) - 7; i < len(data); i++ {
|
||||||
|
avgTemp := (data[i].MaxTemp + data[i].MinTemp) / 2
|
||||||
|
sumTemp += avgTemp
|
||||||
|
sumRainfall += data[i].Rainfall
|
||||||
|
sumCloudiness += data[i].Cloudiness
|
||||||
|
}
|
||||||
|
|
||||||
|
rolling7 := &Rolling7Data{
|
||||||
|
AvgTemp: sumTemp / 7,
|
||||||
|
AvgCloudiness: sumCloudiness / 7,
|
||||||
|
SumRainfall: sumRainfall,
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.Unit == UnitF {
|
||||||
|
rolling7.AvgTemp = rolling7.AvgTemp*9/5 + 32
|
||||||
|
}
|
||||||
|
|
||||||
|
return MeteoData{
|
||||||
|
Location: params.Location,
|
||||||
|
Unit: params.Unit,
|
||||||
|
From: params.Date,
|
||||||
|
Rolling7: rolling7,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user