add register data mechanism
This commit is contained in:
parent
b012db856c
commit
fcc0d06f96
@ -15,6 +15,7 @@ const (
|
|||||||
subjectSensorsGet = "sensors.get"
|
subjectSensorsGet = "sensors.get"
|
||||||
subjectSensorsValuesGet = "sensors.values.get"
|
subjectSensorsValuesGet = "sensors.values.get"
|
||||||
subjectSensorsList = "sensors.list"
|
subjectSensorsList = "sensors.list"
|
||||||
|
subjectSensorsData = "sensors.data."
|
||||||
)
|
)
|
||||||
|
|
||||||
type Handlers struct {
|
type Handlers struct {
|
||||||
@ -48,8 +49,23 @@ func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, erro
|
|||||||
msg.Respond(response)
|
msg.Respond(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handlePublish[Req any](msg *nats.Msg, handler func(Req) error) {
|
||||||
|
var req Req
|
||||||
|
if len(msg.Data) > 0 {
|
||||||
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
||||||
|
slog.Error("failed to unmarshal message", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := handler(req); err != nil {
|
||||||
|
slog.Error("handler error", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handlers) SetupEndpoints() *Handlers {
|
func (h *Handlers) SetupEndpoints() *Handlers {
|
||||||
h.register()
|
h.register()
|
||||||
|
h.registerData()
|
||||||
h.update()
|
h.update()
|
||||||
h.get()
|
h.get()
|
||||||
h.getValues()
|
h.getValues()
|
||||||
@ -74,6 +90,25 @@ func (h *Handlers) register() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handlers) registerData() {
|
||||||
|
h.NATS.Subscribe(subjectSensorsData+"*", func(msg *nats.Msg) {
|
||||||
|
handlePublish(msg, func(data SensorData) error {
|
||||||
|
if err := data.Validate(); err != nil {
|
||||||
|
slog.Error("error validating sensor data", "error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.service.RegisterSensorData(data); err != nil {
|
||||||
|
slog.Error("failed to save sensor data", "error", err, "sensor_id", data.SensorID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Debug("sensor data saved", "sensor_id", data.SensorID, "value", data.Value)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handlers) update() {
|
func (h *Handlers) update() {
|
||||||
h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) {
|
h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) {
|
||||||
handleRequest(msg, func(req Sensor) (Sensor, error) {
|
handleRequest(msg, func(req Sensor) (Sensor, error) {
|
||||||
|
|||||||
@ -35,8 +35,11 @@ func (p *pgxRepo) CreateSensor(s Sensor) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const createSensorDataQuery = `insert into registry (sensor_id, value, created_at) values ($1, $2, $3)`
|
||||||
|
|
||||||
func (p *pgxRepo) CreateSensorData(s SensorData) error {
|
func (p *pgxRepo) CreateSensorData(s SensorData) error {
|
||||||
return nil
|
_, err := p.Exec(context.Background(), createSensorDataQuery, s.SensorID, s.Value, s.Timestamp)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pgxRepo) UpdateSensor(s Sensor) error {
|
func (p *pgxRepo) UpdateSensor(s Sensor) error {
|
||||||
@ -106,7 +109,32 @@ func (i *inMemory) CreateSensor(s Sensor) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *inMemory) CreateSensorData(s SensorData) error {
|
func (i *inMemory) CreateSensorData(data SensorData) error {
|
||||||
|
i.mu.Lock()
|
||||||
|
defer i.mu.Unlock()
|
||||||
|
|
||||||
|
sensor, exists := i.sensors[data.SensorID]
|
||||||
|
if !exists {
|
||||||
|
return ErrSensorNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
if sensor.SensorData == nil {
|
||||||
|
sensor.SensorData = &map[int]SensorData{}
|
||||||
|
}
|
||||||
|
|
||||||
|
key := int(data.Timestamp.Unix())
|
||||||
|
(*sensor.SensorData)[key] = data
|
||||||
|
|
||||||
|
if len(*sensor.SensorData) > 100 {
|
||||||
|
oldestKey := key
|
||||||
|
for k := range *sensor.SensorData {
|
||||||
|
if k < oldestKey {
|
||||||
|
oldestKey = k
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(*sensor.SensorData, oldestKey)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,7 +197,12 @@ func (d *DecoratorRepo) CreateSensor(s Sensor) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DecoratorRepo) CreateSensorData(s SensorData) error {
|
func (d *DecoratorRepo) CreateSensorData(data SensorData) error {
|
||||||
|
if err := d.db.CreateSensorData(data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = d.memory.CreateSensorData(data)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -26,6 +26,16 @@ func (s *Service) RegisterSensor(sensor Sensor) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) RegisterSensorData(data SensorData) error {
|
||||||
|
err := s.repo.CreateSensorData(data)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("error registering sensor data")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) UpdateSensor(sensor Sensor) error {
|
func (s *Service) UpdateSensor(sensor Sensor) error {
|
||||||
|
|
||||||
s.repo.UpdateSensor(sensor)
|
s.repo.UpdateSensor(sensor)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user