package sensors import ( "encoding/json" "log/slog" "nats-app/internal/iot" "time" "github.com/nats-io/nats.go" ) const ( subjectSensorsRegister = "sensors.register" subjectSensorsUpdate = "sensors.update" subjectSensorsGet = "sensors.get" subjectSensorsValuesGet = "sensors.values.get" subjectSensorsList = "sensors.list" subjectSensorsData = "sensors.data." ) type Handlers struct { service *Service *iot.IoTDevice simulator *Simulator } func NewHandlers(service *Service, iot *iot.IoTDevice) *Handlers { simulator := Start(iot.NATS) activeSensors, err := service.repo.ReadAllSensors() if err != nil { slog.Error("reading all sensors", "error", err) } for _, sensor := range activeSensors { go simulator.SimulateSensor(sensor) slog.Info("started simulator for sensor", "sensor_id", sensor.SensorID) } return &Handlers{ service: service, IoTDevice: iot, simulator: simulator, } } func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, error)) { var req Req if len(msg.Data) > 0 { if err := json.Unmarshal(msg.Data, &req); err != nil { msg.Respond([]byte(`{"error":"invalid request"}`)) return } } result, err := handler(req) if err != nil { msg.Respond([]byte(`{"error":"` + err.Error() + `"}`)) return } response, _ := json.Marshal(result) msg.Respond(response) } func handlePublish[Req any](msg *nats.Msg, handler func(Req) error) { var req Req if len(msg.Data) > 0 { if err := json.Unmarshal(msg.Data, &req); err != nil { slog.Error("failed to unmarshal message", "error", err) return } } if err := handler(req); err != nil { slog.Error("handler error", "error", err) } } func (h *Handlers) SetupEndpoints() *Handlers { h.register() h.registerData() h.update() h.get() h.getValues() h.list() return h } func (h *Handlers) register() { h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) { handleRequest(msg, func(req Sensor) (Sensor, error) { if err := req.Validate(); err != nil { slog.Error("error validating sensor", "error", err) return Sensor{}, err } if err := h.service.RegisterSensor(req); err != nil { return Sensor{}, err } go h.simulator.SimulateSensor(req) return req, nil }) }) } func (h *Handlers) registerData() { h.NATS.Subscribe(subjectSensorsData+"*", func(msg *nats.Msg) { handlePublish(msg, func(data SensorData) error { if err := data.Validate(); err != nil { slog.Error("error validating sensor data", "error", err) return err } if err := h.service.RegisterSensorData(data); err != nil { slog.Error("failed to save sensor data", "error", err, "sensor_id", data.SensorID) return err } slog.Debug("sensor data saved", "sensor_id", data.SensorID, "value", data.Value) return nil }) }) } func (h *Handlers) update() { h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) { handleRequest(msg, func(req Sensor) (Sensor, error) { if err := req.Validate(); err != nil { return Sensor{}, err } if err := h.service.UpdateSensor(req); err != nil { return Sensor{}, err } return req, nil }) }) } func (h *Handlers) get() { h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) { handleRequest(msg, func(req SensorRequest) (Sensor, error) { return h.service.GetSensor(req.SensorID) }) }) } func (h *Handlers) getValues() { h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) { handleRequest(msg, func(req SensorDataRequest) (Sensor, error) { if err := req.Validate(); err != nil { return Sensor{}, err } from, err := time.Parse(time.RFC3339, *req.From) if err != nil { return Sensor{}, err } to, err := time.Parse(time.RFC3339, *req.To) if err != nil { return Sensor{}, err } return h.service.GetValues(req.SensorID, from, to) }) }) } func (h *Handlers) list() { h.NATS.Subscribe(subjectSensorsList, func(msg *nats.Msg) { handleRequest(msg, func(req struct{}) ([]Sensor, error) { return h.service.ListSensors() }) }) }