diff --git a/internal/domains/sensors/handlers.go b/internal/domains/sensors/handlers.go index 32eb1e7..4328ca9 100644 --- a/internal/domains/sensors/handlers.go +++ b/internal/domains/sensors/handlers.go @@ -141,6 +141,8 @@ func (h *Handlers) update() { return Sensor{}, err } + h.simulator.UpdateSensor(req) + return req, nil }) }) diff --git a/internal/domains/sensors/simulator.go b/internal/domains/sensors/simulator.go index 901dd53..dcba948 100644 --- a/internal/domains/sensors/simulator.go +++ b/internal/domains/sensors/simulator.go @@ -5,45 +5,76 @@ import ( "log/slog" "math/rand" "nats-app/internal/broker" + "sync" "time" ) type Simulator struct { *broker.NATS + stopChannels map[string]chan bool + mu sync.Mutex } func Start(nats *broker.NATS) *Simulator { return &Simulator{ - NATS: nats, + NATS: nats, + stopChannels: make(map[string]chan bool), } } func (s *Simulator) SimulateSensor(sensor Sensor) { + s.mu.Lock() + stopChan := make(chan bool) + s.stopChannels[sensor.SensorID] = stopChan + s.mu.Unlock() + ticker := time.NewTicker(*sensor.SamplingInterval * time.Second) defer ticker.Stop() - for range ticker.C { - data := s.generateData(sensor) + for { + select { + case <-stopChan: + slog.Info("stopping simulator for sensor", "sensor_id", sensor.SensorID) + return + case <-ticker.C: + data := s.generateData(sensor) - if data.SensorID == "" { - slog.Warn("sensor data generation failed", "sensor_id", sensor.SensorID) - continue - } + if data.SensorID == "" { + slog.Warn("sensor data generation failed", "sensor_id", sensor.SensorID) + continue + } - payload, err := json.Marshal(data) - if err != nil { - slog.Error("failed to marshal sensor data", "error", err, "sensor_id", sensor.SensorID) - continue - } + payload, err := json.Marshal(data) + if err != nil { + slog.Error("failed to marshal sensor data", "error", err, "sensor_id", sensor.SensorID) + continue + } - subject := subjectSensorsData + sensor.SensorID - if err := s.Publish(subject, payload); err != nil { - slog.Error("failed to publish sensor data", "error", err, "subject", subject) - } else { - slog.Debug("sensor data published", "sensor_id", sensor.SensorID, "value", data.Value) + subject := subjectSensorsData + sensor.SensorID + if err := s.Publish(subject, payload); err != nil { + slog.Error("failed to publish sensor data", "error", err, "subject", subject) + } else { + slog.Debug("sensor data published", "sensor_id", sensor.SensorID, "value", data.Value) + } } } +} +func (s *Simulator) UpdateSensor(sensor Sensor) { + s.mu.Lock() + stopChan, exists := s.stopChannels[sensor.SensorID] + s.mu.Unlock() + + if exists { + stopChan <- true + + s.mu.Lock() + delete(s.stopChannels, sensor.SensorID) + s.mu.Unlock() + } + + go s.SimulateSensor(sensor) + slog.Info("simulator updated for sensor", "sensor_id", sensor.SensorID, "new_interval", sensor.SamplingInterval) } func (s *Simulator) generateData(sensor Sensor) SensorData {