nats-app/internal/domains/sensors/repository.go

211 lines
4.4 KiB
Go

package sensors
import (
"context"
"log/slog"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
CreateSensor(s Sensor) error
CreateSensorData(data SensorData) error
UpdateSensor(s Sensor) error
ReadSensor(sensorID string) (Sensor, error)
ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error)
ReadAllSensors() ([]Sensor, error)
}
type pgxRepo struct {
*pgxpool.Pool
}
func newPGXRepo(pool *pgxpool.Pool) Repository {
return &pgxRepo{
pool,
}
}
const createSensorQuery = `insert into sensors (sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below) values ($1, $2, $3, $4, $5)`
func (p *pgxRepo) CreateSensor(s Sensor) error {
_, err := p.Exec(context.Background(), createSensorQuery, s.SensorID, string(s.SensorType), s.SamplingInterval, s.ThresholdAbove, s.ThresholdBelow)
return err
}
func (p *pgxRepo) CreateSensorData(s SensorData) error {
return nil
}
func (p *pgxRepo) UpdateSensor(s Sensor) error {
panic("unimplemented")
}
func (p *pgxRepo) ReadSensor(sensorID string) (Sensor, error) {
panic("unimplemented")
}
func (p *pgxRepo) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) {
panic("unimplemented")
}
const readAllSensorsQuery = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors order by created_at desc`
func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) {
rows, err := p.Query(context.Background(), readAllSensorsQuery)
if err != nil {
return nil, err
}
defer rows.Close()
sensors := []Sensor{}
for rows.Next() {
var s Sensor
if err := rows.Scan(
&s.SensorID,
&s.SensorType,
&s.SamplingInterval,
&s.ThresholdAbove,
&s.ThresholdBelow,
); err != nil {
return nil, err
}
sensors = append(sensors, s)
}
if err := rows.Err(); err != nil {
return nil, err
}
return sensors, nil
}
type inMemory struct {
sensors map[string]*Sensor
mu *sync.Mutex
}
func newInMemoryRepo() Repository {
return &inMemory{
sensors: make(map[string]*Sensor),
mu: &sync.Mutex{},
}
}
func (i *inMemory) CreateSensor(s Sensor) error {
i.mu.Lock()
defer i.mu.Unlock()
if _, exists := i.sensors[s.SensorID]; exists {
return ErrSensorAlreadyExists
}
sensorCopy := s
i.sensors[s.SensorID] = &sensorCopy
return nil
}
func (p *inMemory) CreateSensorData(s SensorData) error {
return nil
}
func (i *inMemory) UpdateSensor(s Sensor) error {
panic("unimplemented")
}
func (i *inMemory) ReadSensor(sensorID string) (Sensor, error) {
panic("unimplemented")
}
func (i *inMemory) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) {
// holds only last 100 values for every sensor
panic("unimplemented")
}
func (i *inMemory) ReadAllSensors() ([]Sensor, error) {
i.mu.Lock()
defer i.mu.Unlock()
sensors := make([]Sensor, 0, len(i.sensors))
for _, s := range i.sensors {
sensors = append(sensors, *s)
}
return sensors, nil
}
type DecoratorRepo struct {
db Repository
memory Repository
}
func NewDecoratorRepo(pool *pgxpool.Pool) Repository {
db := newPGXRepo(pool)
memory := newInMemoryRepo()
sensors, err := db.ReadAllSensors()
if err != nil {
slog.Error("error warming up cache")
}
for _, s := range sensors {
_ = memory.CreateSensor(s)
}
return &DecoratorRepo{
db: db,
memory: memory,
}
}
func (d *DecoratorRepo) CreateSensor(s Sensor) error {
if err := d.db.CreateSensor(s); err != nil {
return err
}
_ = d.memory.CreateSensor(s)
return nil
}
func (p *DecoratorRepo) CreateSensorData(s SensorData) error {
return nil
}
func (d *DecoratorRepo) UpdateSensor(s Sensor) error {
if err := d.db.UpdateSensor(s); err != nil {
return err
}
_ = d.memory.UpdateSensor(s)
return nil
}
func (d *DecoratorRepo) ReadSensor(sensorID string) (Sensor, error) {
sensor, err := d.memory.ReadSensor(sensorID)
if err == nil {
return sensor, nil
}
return d.db.ReadSensor(sensorID)
}
func (d *DecoratorRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) {
values, err := d.memory.ReadSensorValues(sensorID, from, to)
if err == nil && len(values) > 0 {
return values, nil
}
return d.db.ReadSensorValues(sensorID, from, to)
}
func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) {
sensors, err := d.memory.ReadAllSensors()
if err == nil && len(sensors) > 0 {
return sensors, nil
}
return d.db.ReadAllSensors()
}