244 lines
5.1 KiB
Go
244 lines
5.1 KiB
Go
package sensors
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type Repository interface {
|
|
CreateSensor(s Sensor) error
|
|
CreateSensorData(data SensorData) error
|
|
UpdateSensor(s Sensor) error
|
|
ReadSensor(sensorID string) (Sensor, error)
|
|
ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error)
|
|
ReadAllSensors() ([]Sensor, error)
|
|
}
|
|
|
|
type pgxRepo struct {
|
|
*pgxpool.Pool
|
|
}
|
|
|
|
func newPGXRepo(pool *pgxpool.Pool) Repository {
|
|
return &pgxRepo{
|
|
pool,
|
|
}
|
|
}
|
|
|
|
const createSensorQuery = `insert into sensors (sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below) values ($1, $2, $3, $4, $5)`
|
|
|
|
func (p *pgxRepo) CreateSensor(s Sensor) error {
|
|
_, err := p.Exec(context.Background(), createSensorQuery, s.SensorID, string(s.SensorType), s.SamplingInterval, s.ThresholdAbove, s.ThresholdBelow)
|
|
return err
|
|
}
|
|
|
|
const createSensorDataQuery = `insert into registry (sensor_id, value, created_at) values ($1, $2, $3)`
|
|
|
|
func (p *pgxRepo) CreateSensorData(s SensorData) error {
|
|
_, err := p.Exec(context.Background(), createSensorDataQuery, s.SensorID, s.Value, s.Timestamp)
|
|
return err
|
|
}
|
|
|
|
func (p *pgxRepo) UpdateSensor(s Sensor) error {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
func (p *pgxRepo) ReadSensor(sensorID string) (Sensor, error) {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
func (p *pgxRepo) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
const readAllSensorsQuery = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors order by created_at desc`
|
|
|
|
func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) {
|
|
rows, err := p.Query(context.Background(), readAllSensorsQuery)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
sensors := []Sensor{}
|
|
for rows.Next() {
|
|
var s Sensor
|
|
if err := rows.Scan(
|
|
&s.SensorID,
|
|
&s.SensorType,
|
|
&s.SamplingInterval,
|
|
&s.ThresholdAbove,
|
|
&s.ThresholdBelow,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
sensors = append(sensors, s)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return sensors, nil
|
|
}
|
|
|
|
type inMemory struct {
|
|
sensors map[string]*Sensor
|
|
mu *sync.Mutex
|
|
}
|
|
|
|
func newInMemoryRepo() Repository {
|
|
return &inMemory{
|
|
sensors: make(map[string]*Sensor),
|
|
mu: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func (i *inMemory) CreateSensor(s Sensor) error {
|
|
i.mu.Lock()
|
|
defer i.mu.Unlock()
|
|
|
|
if _, exists := i.sensors[s.SensorID]; exists {
|
|
return ErrSensorAlreadyExists
|
|
}
|
|
|
|
sensorCopy := s
|
|
i.sensors[s.SensorID] = &sensorCopy
|
|
return nil
|
|
}
|
|
|
|
func (i *inMemory) CreateSensorData(data SensorData) error {
|
|
i.mu.Lock()
|
|
defer i.mu.Unlock()
|
|
|
|
sensor, exists := i.sensors[data.SensorID]
|
|
if !exists {
|
|
return ErrSensorNotFound
|
|
}
|
|
|
|
if sensor.SensorData == nil {
|
|
sensor.SensorData = &map[int]SensorData{}
|
|
}
|
|
|
|
key := int(data.Timestamp.Unix())
|
|
(*sensor.SensorData)[key] = data
|
|
|
|
if len(*sensor.SensorData) > 100 {
|
|
oldestKey := key
|
|
for k := range *sensor.SensorData {
|
|
if k < oldestKey {
|
|
oldestKey = k
|
|
}
|
|
}
|
|
delete(*sensor.SensorData, oldestKey)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *inMemory) UpdateSensor(s Sensor) error {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
func (i *inMemory) ReadSensor(sensorID string) (Sensor, error) {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
func (i *inMemory) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) {
|
|
// holds only last 100 values for every sensor
|
|
|
|
panic("unimplemented")
|
|
}
|
|
|
|
func (i *inMemory) ReadAllSensors() ([]Sensor, error) {
|
|
i.mu.Lock()
|
|
defer i.mu.Unlock()
|
|
|
|
sensors := make([]Sensor, 0, len(i.sensors))
|
|
for _, s := range i.sensors {
|
|
sensors = append(sensors, *s)
|
|
}
|
|
return sensors, nil
|
|
}
|
|
|
|
type DecoratorRepo struct {
|
|
db Repository
|
|
memory Repository
|
|
}
|
|
|
|
func NewDecoratorRepo(pool *pgxpool.Pool) Repository {
|
|
|
|
db := newPGXRepo(pool)
|
|
memory := newInMemoryRepo()
|
|
|
|
sensors, err := db.ReadAllSensors()
|
|
if err != nil {
|
|
slog.Error("error warming up cache")
|
|
}
|
|
|
|
for _, s := range sensors {
|
|
_ = memory.CreateSensor(s)
|
|
}
|
|
|
|
return &DecoratorRepo{
|
|
db: db,
|
|
memory: memory,
|
|
}
|
|
}
|
|
|
|
func (d *DecoratorRepo) CreateSensor(s Sensor) error {
|
|
if err := d.db.CreateSensor(s); err != nil {
|
|
return err
|
|
}
|
|
|
|
_ = d.memory.CreateSensor(s)
|
|
return nil
|
|
}
|
|
|
|
func (d *DecoratorRepo) CreateSensorData(data SensorData) error {
|
|
if err := d.db.CreateSensorData(data); err != nil {
|
|
return err
|
|
}
|
|
|
|
_ = d.memory.CreateSensorData(data)
|
|
return nil
|
|
}
|
|
|
|
func (d *DecoratorRepo) UpdateSensor(s Sensor) error {
|
|
if err := d.db.UpdateSensor(s); err != nil {
|
|
return err
|
|
}
|
|
|
|
_ = d.memory.UpdateSensor(s)
|
|
return nil
|
|
}
|
|
|
|
func (d *DecoratorRepo) ReadSensor(sensorID string) (Sensor, error) {
|
|
sensor, err := d.memory.ReadSensor(sensorID)
|
|
if err == nil {
|
|
return sensor, nil
|
|
}
|
|
|
|
return d.db.ReadSensor(sensorID)
|
|
}
|
|
|
|
func (d *DecoratorRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) {
|
|
values, err := d.memory.ReadSensorValues(sensorID, from, to)
|
|
if err == nil && len(values) > 0 {
|
|
return values, nil
|
|
}
|
|
|
|
return d.db.ReadSensorValues(sensorID, from, to)
|
|
}
|
|
|
|
func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) {
|
|
sensors, err := d.memory.ReadAllSensors()
|
|
if err == nil && len(sensors) > 0 {
|
|
return sensors, nil
|
|
}
|
|
|
|
return d.db.ReadAllSensors()
|
|
}
|