add read sensor values
This commit is contained in:
parent
f6d94bff1a
commit
65be69d164
@ -6,7 +6,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
@ -67,19 +66,28 @@ func (p *pgxRepo) ReadSensor(sensorID string) (Sensor, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
const readSensorValuesBySensorID = ``
|
||||
const readSensorValuesBySensorID = `select sensor_id, value, created_at from registry where sensor_id = $1 and created_at >= $2 and created_at <= $3 order by created_at desc`
|
||||
|
||||
func (p *pgxRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) {
|
||||
fromPgType := pgtype.Timestamp{
|
||||
Time: from,
|
||||
Valid: true,
|
||||
rows, err := p.Query(context.Background(), readSensorValuesBySensorID, sensorID, from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
toPgType := pgtype.Timestamp{
|
||||
Time: to,
|
||||
Valid: true,
|
||||
defer rows.Close()
|
||||
|
||||
data := []SensorData{}
|
||||
for rows.Next() {
|
||||
var sd SensorData
|
||||
if err := rows.Scan(&sd.SensorID, &sd.Value, &sd.Timestamp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data = append(data, sd)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
panic("unimplemented")
|
||||
return data, nil
|
||||
}
|
||||
|
||||
const readAllSensorsQuery = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors order by created_at desc`
|
||||
@ -196,9 +204,29 @@ func (i *inMemory) ReadSensor(sensorID string) (Sensor, error) {
|
||||
}
|
||||
|
||||
func (i *inMemory) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) {
|
||||
// holds only last 100 values for every sensor
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
panic("unimplemented")
|
||||
sensor, exists := i.sensors[sensorID]
|
||||
if !exists {
|
||||
return nil, ErrSensorNotFound
|
||||
}
|
||||
|
||||
if sensor.SensorData == nil {
|
||||
return []SensorData{}, nil
|
||||
}
|
||||
|
||||
data := []SensorData{}
|
||||
fromUnix := from.Unix()
|
||||
toUnix := to.Unix()
|
||||
|
||||
for timestamp, sd := range *sensor.SensorData {
|
||||
if int64(timestamp) >= fromUnix && int64(timestamp) <= toUnix {
|
||||
data = append(data, sd)
|
||||
}
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (i *inMemory) ReadAllSensors() ([]Sensor, error) {
|
||||
@ -275,13 +303,24 @@ func (d *DecoratorRepo) ReadSensor(sensorID string) (Sensor, error) {
|
||||
|
||||
func (d *DecoratorRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) {
|
||||
values, err := d.memory.ReadSensorValues(sensorID, from, to)
|
||||
if err == nil && len(values) > 0 {
|
||||
return values, nil
|
||||
if err != nil || len(values) == 0 {
|
||||
return d.db.ReadSensorValues(sensorID, from, to)
|
||||
}
|
||||
|
||||
var oldestTimestamp *time.Time
|
||||
for _, v := range values {
|
||||
if oldestTimestamp == nil || v.Timestamp.Before(*oldestTimestamp) {
|
||||
oldestTimestamp = v.Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
if oldestTimestamp != nil && oldestTimestamp.After(from) {
|
||||
return d.db.ReadSensorValues(sensorID, from, to)
|
||||
}
|
||||
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) {
|
||||
var sensors []Sensor
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user