diff --git a/internal/domains/sensors/handlers.go b/internal/domains/sensors/handlers.go index ab3248b..98313db 100644 --- a/internal/domains/sensors/handlers.go +++ b/internal/domains/sensors/handlers.go @@ -15,6 +15,7 @@ const ( subjectSensorsGet = "sensors.get" subjectSensorsValuesGet = "sensors.values.get" subjectSensorsList = "sensors.list" + subjectSensorsData = "sensors.data." ) type Handlers struct { @@ -48,8 +49,23 @@ func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, erro msg.Respond(response) } +func handlePublish[Req any](msg *nats.Msg, handler func(Req) error) { + var req Req + if len(msg.Data) > 0 { + if err := json.Unmarshal(msg.Data, &req); err != nil { + slog.Error("failed to unmarshal message", "error", err) + return + } + } + + if err := handler(req); err != nil { + slog.Error("handler error", "error", err) + } +} + func (h *Handlers) SetupEndpoints() *Handlers { h.register() + h.registerData() h.update() h.get() h.getValues() @@ -74,6 +90,25 @@ func (h *Handlers) register() { }) } +func (h *Handlers) registerData() { + h.NATS.Subscribe(subjectSensorsData+"*", func(msg *nats.Msg) { + handlePublish(msg, func(data SensorData) error { + if err := data.Validate(); err != nil { + slog.Error("error validating sensor data", "error", err) + return err + } + + if err := h.service.RegisterSensorData(data); err != nil { + slog.Error("failed to save sensor data", "error", err, "sensor_id", data.SensorID) + return err + } + + slog.Debug("sensor data saved", "sensor_id", data.SensorID, "value", data.Value) + return nil + }) + }) +} + func (h *Handlers) update() { h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) { handleRequest(msg, func(req Sensor) (Sensor, error) { diff --git a/internal/domains/sensors/repository.go b/internal/domains/sensors/repository.go index d53ae90..2d1b3d2 100644 --- a/internal/domains/sensors/repository.go +++ b/internal/domains/sensors/repository.go @@ -35,8 +35,11 @@ func (p *pgxRepo) CreateSensor(s Sensor) error { return err } +const createSensorDataQuery = `insert into registry (sensor_id, value, created_at) values ($1, $2, $3)` + func (p *pgxRepo) CreateSensorData(s SensorData) error { - return nil + _, err := p.Exec(context.Background(), createSensorDataQuery, s.SensorID, s.Value, s.Timestamp) + return err } func (p *pgxRepo) UpdateSensor(s Sensor) error { @@ -106,7 +109,32 @@ func (i *inMemory) CreateSensor(s Sensor) error { return nil } -func (p *inMemory) CreateSensorData(s SensorData) error { +func (i *inMemory) CreateSensorData(data SensorData) error { + i.mu.Lock() + defer i.mu.Unlock() + + sensor, exists := i.sensors[data.SensorID] + if !exists { + return ErrSensorNotFound + } + + if sensor.SensorData == nil { + sensor.SensorData = &map[int]SensorData{} + } + + key := int(data.Timestamp.Unix()) + (*sensor.SensorData)[key] = data + + if len(*sensor.SensorData) > 100 { + oldestKey := key + for k := range *sensor.SensorData { + if k < oldestKey { + oldestKey = k + } + } + delete(*sensor.SensorData, oldestKey) + } + return nil } @@ -169,7 +197,12 @@ func (d *DecoratorRepo) CreateSensor(s Sensor) error { return nil } -func (p *DecoratorRepo) CreateSensorData(s SensorData) error { +func (d *DecoratorRepo) CreateSensorData(data SensorData) error { + if err := d.db.CreateSensorData(data); err != nil { + return err + } + + _ = d.memory.CreateSensorData(data) return nil } diff --git a/internal/domains/sensors/service.go b/internal/domains/sensors/service.go index 365861c..e979520 100644 --- a/internal/domains/sensors/service.go +++ b/internal/domains/sensors/service.go @@ -26,6 +26,16 @@ func (s *Service) RegisterSensor(sensor Sensor) error { return nil } +func (s *Service) RegisterSensorData(data SensorData) error { + err := s.repo.CreateSensorData(data) + if err != nil { + slog.Error("error registering sensor data") + return err + } + + return nil +} + func (s *Service) UpdateSensor(sensor Sensor) error { s.repo.UpdateSensor(sensor)