package sensors import ( "context" "log/slog" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" ) type Repository interface { CreateSensor(s Sensor) error CreateSensorData(data SensorData) error UpdateSensor(s Sensor) error ReadSensor(sensorID string) (Sensor, error) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) ReadAllSensors() ([]Sensor, error) } type pgxRepo struct { *pgxpool.Pool } func newPGXRepo(pool *pgxpool.Pool) Repository { return &pgxRepo{ pool, } } const createSensorQuery = `insert into sensors (sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below) values ($1, $2, $3, $4, $5)` func (p *pgxRepo) CreateSensor(s Sensor) error { _, err := p.Exec(context.Background(), createSensorQuery, s.SensorID, string(s.SensorType), s.SamplingInterval, s.ThresholdAbove, s.ThresholdBelow) return err } const createSensorDataQuery = `insert into registry (sensor_id, value, created_at) values ($1, $2, $3)` func (p *pgxRepo) CreateSensorData(s SensorData) error { _, err := p.Exec(context.Background(), createSensorDataQuery, s.SensorID, s.Value, s.Timestamp) return err } const updateSensorQuery = `update sensors set sensor_type = $1, sampling_interval = $2, threshold_above = $3, threshold_below = $4 where sensor_id = $5` func (p *pgxRepo) UpdateSensor(s Sensor) error { _, err := p.Exec(context.Background(), updateSensorQuery, string(s.SensorType), s.SamplingInterval, s.ThresholdAbove, s.ThresholdBelow, s.SensorID) return err } const readSensorBySensorID = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors where sensor_id = $1` func (p *pgxRepo) ReadSensor(sensorID string) (Sensor, error) { var s Sensor err := p.QueryRow(context.Background(), readSensorBySensorID, sensorID).Scan( &s.SensorID, &s.SensorType, &s.SamplingInterval, &s.ThresholdAbove, &s.ThresholdBelow, ) if err != nil { return Sensor{}, ErrSensorNotFound } return s, nil } const readSensorValuesBySensorID = `select sensor_id, value, created_at from registry where sensor_id = $1 and created_at >= $2 and created_at <= $3 order by created_at desc` func (p *pgxRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) { rows, err := p.Query(context.Background(), readSensorValuesBySensorID, sensorID, from, to) if err != nil { return nil, err } defer rows.Close() data := []SensorData{} for rows.Next() { var sd SensorData if err := rows.Scan(&sd.SensorID, &sd.Value, &sd.Timestamp); err != nil { return nil, err } data = append(data, sd) } if err := rows.Err(); err != nil { return nil, err } return data, nil } const readAllSensorsQuery = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors order by created_at desc` func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) { rows, err := p.Query(context.Background(), readAllSensorsQuery) if err != nil { return nil, err } defer rows.Close() sensors := []Sensor{} for rows.Next() { var s Sensor if err := rows.Scan( &s.SensorID, &s.SensorType, &s.SamplingInterval, &s.ThresholdAbove, &s.ThresholdBelow, ); err != nil { return nil, err } sensors = append(sensors, s) } if err := rows.Err(); err != nil { return nil, err } return sensors, nil } type inMemory struct { sensors map[string]*Sensor mu *sync.Mutex } func newInMemoryRepo() Repository { return &inMemory{ sensors: make(map[string]*Sensor), mu: &sync.Mutex{}, } } func (i *inMemory) CreateSensor(s Sensor) error { i.mu.Lock() defer i.mu.Unlock() if _, exists := i.sensors[s.SensorID]; exists { return ErrSensorAlreadyExists } sensorCopy := s i.sensors[s.SensorID] = &sensorCopy return nil } func (i *inMemory) CreateSensorData(data SensorData) error { i.mu.Lock() defer i.mu.Unlock() sensor, exists := i.sensors[data.SensorID] if !exists { return ErrSensorNotFound } if sensor.SensorData == nil { sensor.SensorData = &map[int]SensorData{} } key := int(data.Timestamp.Unix()) (*sensor.SensorData)[key] = data if len(*sensor.SensorData) > 100 { oldestKey := key for k := range *sensor.SensorData { if k < oldestKey { oldestKey = k } } delete(*sensor.SensorData, oldestKey) } return nil } func (i *inMemory) UpdateSensor(s Sensor) error { i.mu.Lock() defer i.mu.Unlock() sensor, exists := i.sensors[s.SensorID] if !exists { return ErrSensorNotFound } sensor.SensorType = s.SensorType sensor.SamplingInterval = s.SamplingInterval sensor.ThresholdAbove = s.ThresholdAbove sensor.ThresholdBelow = s.ThresholdBelow return nil } func (i *inMemory) ReadSensor(sensorID string) (Sensor, error) { i.mu.Lock() defer i.mu.Unlock() sensor, exists := i.sensors[sensorID] if !exists { return Sensor{}, ErrSensorNotFound } return *sensor, nil } func (i *inMemory) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) { i.mu.Lock() defer i.mu.Unlock() sensor, exists := i.sensors[sensorID] if !exists { return nil, ErrSensorNotFound } if sensor.SensorData == nil { return []SensorData{}, nil } data := []SensorData{} fromUnix := from.Unix() toUnix := to.Unix() for timestamp, sd := range *sensor.SensorData { if int64(timestamp) >= fromUnix && int64(timestamp) <= toUnix { data = append(data, sd) } } return data, nil } func (i *inMemory) ReadAllSensors() ([]Sensor, error) { i.mu.Lock() defer i.mu.Unlock() sensors := make([]Sensor, 0, len(i.sensors)) for _, s := range i.sensors { sensors = append(sensors, *s) } return sensors, nil } type DecoratorRepo struct { db Repository memory Repository } func NewDecoratorRepo(pool *pgxpool.Pool) Repository { db := newPGXRepo(pool) memory := newInMemoryRepo() sensors, err := db.ReadAllSensors() if err != nil { slog.Error("error warming up cache") } for _, s := range sensors { _ = memory.CreateSensor(s) } return &DecoratorRepo{ db: db, memory: memory, } } func (d *DecoratorRepo) CreateSensor(s Sensor) error { if err := d.db.CreateSensor(s); err != nil { return err } _ = d.memory.CreateSensor(s) return nil } func (d *DecoratorRepo) CreateSensorData(data SensorData) error { if err := d.db.CreateSensorData(data); err != nil { return err } _ = d.memory.CreateSensorData(data) return nil } func (d *DecoratorRepo) UpdateSensor(s Sensor) error { if err := d.db.UpdateSensor(s); err != nil { return err } _ = d.memory.UpdateSensor(s) return nil } func (d *DecoratorRepo) ReadSensor(sensorID string) (Sensor, error) { sensor, err := d.memory.ReadSensor(sensorID) if err == nil { return sensor, nil } return d.db.ReadSensor(sensorID) } func (d *DecoratorRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) { values, err := d.memory.ReadSensorValues(sensorID, from, to) if err != nil || len(values) == 0 { return d.db.ReadSensorValues(sensorID, from, to) } var oldestTimestamp *time.Time for _, v := range values { if oldestTimestamp == nil || v.Timestamp.Before(*oldestTimestamp) { oldestTimestamp = v.Timestamp } } if oldestTimestamp != nil && oldestTimestamp.After(from) { return d.db.ReadSensorValues(sensorID, from, to) } return values, nil } func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) { var sensors []Sensor sensors, err := d.memory.ReadAllSensors() if err == nil && len(sensors) > 0 { return sensors, nil } sensors, err = d.db.ReadAllSensors() if err != nil { return []Sensor{}, err } for _, s := range sensors { _ = d.memory.CreateSensor(s) } return sensors, nil }