194 lines
4.3 KiB
Go
194 lines
4.3 KiB
Go
package sensors
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log/slog"
|
|
"nats-app/internal/iot"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
const (
|
|
subjectSensorsRegister = "sensors.register"
|
|
subjectSensorsUpdate = "sensors.update"
|
|
subjectSensorsGet = "sensors.get"
|
|
subjectSensorsValuesGet = "sensors.values.get"
|
|
subjectSensorsList = "sensors.list"
|
|
subjectSensorsData = "sensors.data."
|
|
)
|
|
|
|
type Handlers struct {
|
|
service *Service
|
|
*iot.IoTDevice
|
|
simulator *Simulator
|
|
}
|
|
|
|
func NewHandlers(service *Service, iot *iot.IoTDevice) *Handlers {
|
|
|
|
simulator := Start(iot.NATS)
|
|
activeSensors, err := service.repo.ReadAllSensors()
|
|
if err != nil {
|
|
slog.Error("reading all sensors", "error", err)
|
|
}
|
|
|
|
for _, sensor := range activeSensors {
|
|
go simulator.SimulateSensor(sensor)
|
|
slog.Info("started simulator for sensor", "sensor_id", sensor.SensorID)
|
|
}
|
|
|
|
sensors, _ := service.repo.ReadAllSensors()
|
|
slog.Info("sensors", "sens", sensors)
|
|
|
|
return &Handlers{
|
|
service: service,
|
|
IoTDevice: iot,
|
|
simulator: simulator,
|
|
}
|
|
}
|
|
|
|
func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, error)) {
|
|
var req Req
|
|
if len(msg.Data) > 0 {
|
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
|
msg.Respond([]byte(`{"error":"invalid request"}`))
|
|
return
|
|
}
|
|
}
|
|
|
|
result, err := handler(req)
|
|
if err != nil {
|
|
msg.Respond([]byte(`{"error":"` + err.Error() + `"}`))
|
|
return
|
|
}
|
|
|
|
response, _ := json.Marshal(result)
|
|
msg.Respond(response)
|
|
}
|
|
|
|
func handlePublish[Req any](msg *nats.Msg, handler func(Req) error) {
|
|
var req Req
|
|
if len(msg.Data) > 0 {
|
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
|
slog.Error("failed to unmarshal message", "error", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := handler(req); err != nil {
|
|
slog.Error("handler error", "error", err)
|
|
}
|
|
}
|
|
|
|
func (h *Handlers) SetupEndpoints() *Handlers {
|
|
h.register()
|
|
h.registerData()
|
|
h.update()
|
|
h.get()
|
|
h.getValues()
|
|
h.list()
|
|
return h
|
|
}
|
|
|
|
func (h *Handlers) register() {
|
|
h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) {
|
|
handleRequest(msg, func(req Sensor) (Sensor, error) {
|
|
if err := req.Validate(); err != nil {
|
|
slog.Error("error validating sensor", "error", err)
|
|
return Sensor{}, err
|
|
}
|
|
|
|
if err := h.service.RegisterSensor(req); err != nil {
|
|
return Sensor{}, err
|
|
}
|
|
|
|
go h.simulator.SimulateSensor(req)
|
|
|
|
return req, nil
|
|
})
|
|
})
|
|
}
|
|
|
|
func (h *Handlers) registerData() {
|
|
h.NATS.Subscribe(subjectSensorsData+"*", func(msg *nats.Msg) {
|
|
handlePublish(msg, func(data SensorData) error {
|
|
if err := data.Validate(); err != nil {
|
|
slog.Error("error validating sensor data", "error", err)
|
|
return err
|
|
}
|
|
|
|
if err := h.service.RegisterSensorData(data); err != nil {
|
|
slog.Error("failed to save sensor data", "error", err, "sensor_id", data.SensorID)
|
|
return err
|
|
}
|
|
|
|
slog.Debug("sensor data saved", "sensor_id", data.SensorID, "value", data.Value)
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
func (h *Handlers) update() {
|
|
h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) {
|
|
handleRequest(msg, func(req Sensor) (Sensor, error) {
|
|
slog.Debug("calling sensor.update", "payload", req)
|
|
|
|
if err := req.Validate(); err != nil {
|
|
return Sensor{}, err
|
|
}
|
|
|
|
if err := h.service.UpdateSensor(req); err != nil {
|
|
return Sensor{}, err
|
|
}
|
|
|
|
h.simulator.UpdateSensor(req)
|
|
|
|
return req, nil
|
|
})
|
|
})
|
|
}
|
|
|
|
func (h *Handlers) get() {
|
|
h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) {
|
|
handleRequest(msg, func(req SensorRequest) (Sensor, error) {
|
|
slog.Debug("calling sensor.get", "payload", req)
|
|
|
|
if err := req.Validate(); err != nil {
|
|
return Sensor{}, err
|
|
}
|
|
|
|
return h.service.GetSensor(req.SensorID)
|
|
})
|
|
})
|
|
}
|
|
|
|
func (h *Handlers) getValues() {
|
|
h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) {
|
|
handleRequest(msg, func(req SensorDataRequest) ([]SensorData, error) {
|
|
if err := req.Validate(); err != nil {
|
|
return []SensorData{}, err
|
|
}
|
|
|
|
from, err := time.Parse(time.RFC3339, *req.From)
|
|
if err != nil {
|
|
return []SensorData{}, err
|
|
}
|
|
|
|
to, err := time.Parse(time.RFC3339, *req.To)
|
|
if err != nil {
|
|
return []SensorData{}, err
|
|
}
|
|
|
|
return h.service.GetValues(req.SensorID, from, to)
|
|
})
|
|
})
|
|
}
|
|
|
|
func (h *Handlers) list() {
|
|
h.NATS.Subscribe(subjectSensorsList, func(msg *nats.Msg) {
|
|
handleRequest(msg, func(req struct{}) ([]Sensor, error) {
|
|
return h.service.ListSensors()
|
|
})
|
|
})
|
|
}
|