add stop and update simulator

This commit is contained in:
Pedro Pérez 2025-10-10 00:29:12 +02:00
parent 65be69d164
commit f5013a88a2
2 changed files with 50 additions and 17 deletions

View File

@ -141,6 +141,8 @@ func (h *Handlers) update() {
return Sensor{}, err return Sensor{}, err
} }
h.simulator.UpdateSensor(req)
return req, nil return req, nil
}) })
}) })

View File

@ -5,45 +5,76 @@ import (
"log/slog" "log/slog"
"math/rand" "math/rand"
"nats-app/internal/broker" "nats-app/internal/broker"
"sync"
"time" "time"
) )
type Simulator struct { type Simulator struct {
*broker.NATS *broker.NATS
stopChannels map[string]chan bool
mu sync.Mutex
} }
func Start(nats *broker.NATS) *Simulator { func Start(nats *broker.NATS) *Simulator {
return &Simulator{ return &Simulator{
NATS: nats, NATS: nats,
stopChannels: make(map[string]chan bool),
} }
} }
func (s *Simulator) SimulateSensor(sensor Sensor) { func (s *Simulator) SimulateSensor(sensor Sensor) {
s.mu.Lock()
stopChan := make(chan bool)
s.stopChannels[sensor.SensorID] = stopChan
s.mu.Unlock()
ticker := time.NewTicker(*sensor.SamplingInterval * time.Second) ticker := time.NewTicker(*sensor.SamplingInterval * time.Second)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for {
data := s.generateData(sensor) select {
case <-stopChan:
slog.Info("stopping simulator for sensor", "sensor_id", sensor.SensorID)
return
case <-ticker.C:
data := s.generateData(sensor)
if data.SensorID == "" { if data.SensorID == "" {
slog.Warn("sensor data generation failed", "sensor_id", sensor.SensorID) slog.Warn("sensor data generation failed", "sensor_id", sensor.SensorID)
continue continue
} }
payload, err := json.Marshal(data) payload, err := json.Marshal(data)
if err != nil { if err != nil {
slog.Error("failed to marshal sensor data", "error", err, "sensor_id", sensor.SensorID) slog.Error("failed to marshal sensor data", "error", err, "sensor_id", sensor.SensorID)
continue continue
} }
subject := subjectSensorsData + sensor.SensorID subject := subjectSensorsData + sensor.SensorID
if err := s.Publish(subject, payload); err != nil { if err := s.Publish(subject, payload); err != nil {
slog.Error("failed to publish sensor data", "error", err, "subject", subject) slog.Error("failed to publish sensor data", "error", err, "subject", subject)
} else { } else {
slog.Debug("sensor data published", "sensor_id", sensor.SensorID, "value", data.Value) slog.Debug("sensor data published", "sensor_id", sensor.SensorID, "value", data.Value)
}
} }
} }
}
func (s *Simulator) UpdateSensor(sensor Sensor) {
s.mu.Lock()
stopChan, exists := s.stopChannels[sensor.SensorID]
s.mu.Unlock()
if exists {
stopChan <- true
s.mu.Lock()
delete(s.stopChannels, sensor.SensorID)
s.mu.Unlock()
}
go s.SimulateSensor(sensor)
slog.Info("simulator updated for sensor", "sensor_id", sensor.SensorID, "new_interval", sensor.SamplingInterval)
} }
func (s *Simulator) generateData(sensor Sensor) SensorData { func (s *Simulator) generateData(sensor Sensor) SensorData {