diff --git a/internal/domains/sensors/repository.go b/internal/domains/sensors/repository.go index 1b746ba..4f64d28 100644 --- a/internal/domains/sensors/repository.go +++ b/internal/domains/sensors/repository.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" ) @@ -67,19 +66,28 @@ func (p *pgxRepo) ReadSensor(sensorID string) (Sensor, error) { return s, nil } -const readSensorValuesBySensorID = `` +const readSensorValuesBySensorID = `select sensor_id, value, created_at from registry where sensor_id = $1 and created_at >= $2 and created_at <= $3 order by created_at desc` func (p *pgxRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) { - fromPgType := pgtype.Timestamp{ - Time: from, - Valid: true, + rows, err := p.Query(context.Background(), readSensorValuesBySensorID, sensorID, from, to) + if err != nil { + return nil, err } - toPgType := pgtype.Timestamp{ - Time: to, - Valid: true, + defer rows.Close() + + data := []SensorData{} + for rows.Next() { + var sd SensorData + if err := rows.Scan(&sd.SensorID, &sd.Value, &sd.Timestamp); err != nil { + return nil, err + } + data = append(data, sd) + } + if err := rows.Err(); err != nil { + return nil, err } - panic("unimplemented") + return data, nil } const readAllSensorsQuery = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors order by created_at desc` @@ -196,9 +204,29 @@ func (i *inMemory) ReadSensor(sensorID string) (Sensor, error) { } func (i *inMemory) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) { - // holds only last 100 values for every sensor + i.mu.Lock() + defer i.mu.Unlock() - panic("unimplemented") + sensor, exists := i.sensors[sensorID] + if !exists { + return nil, ErrSensorNotFound + } + + if sensor.SensorData == nil { + return []SensorData{}, nil + } + + data := []SensorData{} + fromUnix := from.Unix() + toUnix := to.Unix() + + for timestamp, sd := range *sensor.SensorData { + if int64(timestamp) >= fromUnix && int64(timestamp) <= toUnix { + data = append(data, sd) + } + } + + return data, nil } func (i *inMemory) ReadAllSensors() ([]Sensor, error) { @@ -275,11 +303,22 @@ func (d *DecoratorRepo) ReadSensor(sensorID string) (Sensor, error) { func (d *DecoratorRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) { values, err := d.memory.ReadSensorValues(sensorID, from, to) - if err == nil && len(values) > 0 { - return values, nil + if err != nil || len(values) == 0 { + return d.db.ReadSensorValues(sensorID, from, to) } - return d.db.ReadSensorValues(sensorID, from, to) + var oldestTimestamp *time.Time + for _, v := range values { + if oldestTimestamp == nil || v.Timestamp.Before(*oldestTimestamp) { + oldestTimestamp = v.Timestamp + } + } + + if oldestTimestamp != nil && oldestTimestamp.After(from) { + return d.db.ReadSensorValues(sensorID, from, to) + } + + return values, nil } func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) {