diff --git a/internal/domains/sensors/domain.go b/internal/domains/sensors/domain.go index 51cba65..d58af79 100644 --- a/internal/domains/sensors/domain.go +++ b/internal/domains/sensors/domain.go @@ -106,6 +106,8 @@ func (r *SensorDataRequest) Validate() error { } var ( + ErrRegisteringSensor = errors.New("error registering sensor") + ErrUpdatingSensor = errors.New("error updating sensor") ErrInvalidSensorIdentifier = errors.New("sensor identifier is required") ErrInvalidSensorType = errors.New("sensor type is required") ErrSensorNotFound = errors.New("sensor not found") diff --git a/internal/domains/sensors/handlers.go b/internal/domains/sensors/handlers.go index 4328ca9..5c02bab 100644 --- a/internal/domains/sensors/handlers.go +++ b/internal/domains/sensors/handlers.go @@ -4,7 +4,6 @@ import ( "encoding/json" "log/slog" "nats-app/internal/iot" - "time" "github.com/nats-io/nats.go" ) @@ -93,11 +92,6 @@ func (h *Handlers) SetupEndpoints() *Handlers { func (h *Handlers) register() { h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) { handleRequest(msg, func(req Sensor) (Sensor, error) { - if err := req.Validate(); err != nil { - slog.Error("error validating sensor", "error", err) - return Sensor{}, err - } - if err := h.service.RegisterSensor(req); err != nil { return Sensor{}, err } @@ -112,18 +106,7 @@ func (h *Handlers) register() { func (h *Handlers) registerData() { h.NATS.Subscribe(subjectSensorsData+"*", func(msg *nats.Msg) { handlePublish(msg, func(data SensorData) error { - if err := data.Validate(); err != nil { - slog.Error("error validating sensor data", "error", err) - return err - } - - if err := h.service.RegisterSensorData(data); err != nil { - slog.Error("failed to save sensor data", "error", err, "sensor_id", data.SensorID) - return err - } - - slog.Debug("sensor data saved", "sensor_id", data.SensorID, "value", data.Value) - return nil + return h.service.RegisterSensorData(data) }) }) } @@ -131,12 +114,6 @@ func (h *Handlers) registerData() { func (h *Handlers) update() { h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) { handleRequest(msg, func(req Sensor) (Sensor, error) { - slog.Debug("calling sensor.update", "payload", req) - - if err := req.Validate(); err != nil { - return Sensor{}, err - } - if err := h.service.UpdateSensor(req); err != nil { return Sensor{}, err } @@ -151,13 +128,7 @@ func (h *Handlers) update() { func (h *Handlers) get() { h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) { handleRequest(msg, func(req SensorRequest) (Sensor, error) { - slog.Debug("calling sensor.get", "payload", req) - - if err := req.Validate(); err != nil { - return Sensor{}, err - } - - return h.service.GetSensor(req.SensorID) + return h.service.GetSensor(req) }) }) } @@ -165,21 +136,7 @@ func (h *Handlers) get() { func (h *Handlers) getValues() { h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) { handleRequest(msg, func(req SensorDataRequest) ([]SensorData, error) { - if err := req.Validate(); err != nil { - return []SensorData{}, err - } - - from, err := time.Parse(time.RFC3339, *req.From) - if err != nil { - return []SensorData{}, err - } - - to, err := time.Parse(time.RFC3339, *req.To) - if err != nil { - return []SensorData{}, err - } - - return h.service.GetValues(req.SensorID, from, to) + return h.service.GetValues(req) }) }) } diff --git a/internal/domains/sensors/service.go b/internal/domains/sensors/service.go index 769aec5..21a9167 100644 --- a/internal/domains/sensors/service.go +++ b/internal/domains/sensors/service.go @@ -2,6 +2,7 @@ package sensors import ( "log/slog" + "strings" "time" ) @@ -16,20 +17,32 @@ func NewService(repo Repository) *Service { } func (s *Service) RegisterSensor(sensor Sensor) error { + if err := sensor.Validate(); err != nil { + slog.Error("error validating sensor", "error", err) + return err + } err := s.repo.CreateSensor(sensor) if err != nil { slog.Error("error registering sensor", "error", err) - return err + if strings.Contains(err.Error(), "duplicate key value") { + return ErrSensorAlreadyExists + } + return ErrRegisteringSensor } return nil } func (s *Service) RegisterSensorData(data SensorData) error { + if err := data.Validate(); err != nil { + slog.Error("error validating sensor data", "error", err) + return err + } + err := s.repo.CreateSensorData(data) if err != nil { - slog.Error("error registering sensor data") + slog.Error("error registering sensor data", "error", err) return err } @@ -37,15 +50,51 @@ func (s *Service) RegisterSensorData(data SensorData) error { } func (s *Service) UpdateSensor(sensor Sensor) error { - return s.repo.UpdateSensor(sensor) + if err := sensor.Validate(); err != nil { + slog.Error("error validating sensor data", "error", err) + return err + } + + err := s.repo.UpdateSensor(sensor) + if err != nil { + slog.Error("error updating sensor", "error", err) + if strings.Contains(err.Error(), "duplicate key value") { + return ErrSensorAlreadyExists + } + return ErrUpdatingSensor + } + + return nil } -func (s *Service) GetSensor(sensorID string) (Sensor, error) { - return s.repo.ReadSensor(sensorID) +func (s *Service) GetSensor(sensor SensorRequest) (Sensor, error) { + if err := sensor.Validate(); err != nil { + slog.Error("error getting sensor", "error", err) + return Sensor{}, err + } + + return s.repo.ReadSensor(sensor.SensorID) } -func (s *Service) GetValues(sensorID string, from, to time.Time) ([]SensorData, error) { - return s.repo.ReadSensorValues(sensorID, from, to) +func (s *Service) GetValues(sensor SensorDataRequest) ([]SensorData, error) { + if err := sensor.Validate(); err != nil { + slog.Error("error validating sensor data request", "error", err) + return []SensorData{}, err + } + + from, err := time.Parse(time.RFC3339, *sensor.From) + if err != nil { + slog.Error("error parsing from date", "error", err) + return []SensorData{}, err + } + + to, err := time.Parse(time.RFC3339, *sensor.To) + if err != nil { + slog.Error("error parsing to date", "error", err) + return []SensorData{}, err + } + + return s.repo.ReadSensorValues(sensor.SensorID, from, to) } func (s *Service) ListSensors() ([]Sensor, error) { diff --git a/internal/domains/sensors/simulator.go b/internal/domains/sensors/simulator.go index dcba948..5245068 100644 --- a/internal/domains/sensors/simulator.go +++ b/internal/domains/sensors/simulator.go @@ -22,6 +22,9 @@ func Start(nats *broker.NATS) *Simulator { } } +// SimulateSensor simula lo que es un sensor, se llama a ese método como una +// go-rutina separada. Hace uso del SamplingInterval como temporizador para +// el canal ticker. func (s *Simulator) SimulateSensor(sensor Sensor) { s.mu.Lock() stopChan := make(chan bool) @@ -60,6 +63,8 @@ func (s *Simulator) SimulateSensor(sensor Sensor) { } } +// UpdateSensor para la gorutina que haya activa de dicho sensor, y comienza una +// nueva con el intervalo actualizado. func (s *Simulator) UpdateSensor(sensor Sensor) { s.mu.Lock() stopChan, exists := s.stopChannels[sensor.SensorID] @@ -77,6 +82,7 @@ func (s *Simulator) UpdateSensor(sensor Sensor) { slog.Info("simulator updated for sensor", "sensor_id", sensor.SensorID, "new_interval", sensor.SamplingInterval) } +// generateData genera datos aleatorios por cada tipo de sensor. func (s *Simulator) generateData(sensor Sensor) SensorData { now := time.Now() data := SensorData{