Compare commits

..

No commits in common. "1f6d0a58e78359b149272a12a479ee52ca629d42" and "9e72b5891722515c412978f70c7a65102dc72104" have entirely different histories.

12 changed files with 75 additions and 1520 deletions

View File

@ -25,12 +25,6 @@ dockerize-nats:
migrateup: migrateup:
go run -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest -path app/database -database "postgresql://developer:secret@localhost:5432/$(DB_NAME)?sslmode=disable" -verbose up go run -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest -path app/database -database "postgresql://developer:secret@localhost:5432/$(DB_NAME)?sslmode=disable" -verbose up
.PHONY: mock
#Mock database
mock:
go run go.uber.org/mock/mockgen@latest -package mock -destination internal/domains/sensors/mock/querier.go $(MOD_NAME)/internal/domains/sensors Repository
.PHONY: run .PHONY: run
# Start app in development environment # Start app in development environment
run: run:

120
README.md
View File

@ -9,67 +9,6 @@ por el puesto de programador Go.
- NATS CLI - NATS CLI
- Make, si prefieres la comodidad de usar Makefile - Make, si prefieres la comodidad de usar Makefile
## Comandos
### Registrar un sensor
- Campos obligatorios: `sensor_id` y `sensor_type`.
- Campos opcionales: `sampling`, `thresholdabove` y `thresholdbelow`.
`nats req sensors.register '{
"sensor_id": "sensor-001",
"sensor_type": "temperature",
"sampling": 3600,
"thresoldabove": 50.0,
"thresoldbelow": -10.0
}'`
### Actualizar configuración sensor
- Campo obligatorio: `sensor_id` y la presencia de al menos un parámetro.
`nats req sensors.update '{
"sensor_id": "sensor-001",
"sensor_type": "temperature",
"sampling": 10,
"thresoldabove": 100.0,
"thresoldbelow": -15.0
}'`
### Obtener información de un sensor
- Campo obligatorio: `sensor_id`.
`nats req sensors.get '{
"sensor_id": "sensor-001"
}'`
### Obtener valores de un sensor
- Campo obligatorio: `sensor_id`.
- Campos opcionales: `from` y `to` en formato RFC3339. Si no se especifican, se toman los últimos 7 días.
`nats req sensors.values.get '{
"sensor_id": "sensor-001",
"from": "2025-10-03T00:00:00Z",
"to": "2025-10-10T23:59:59Z"
}'`
### Obtener listado de sensores
No hay _payload_, pero hay que poner comillas dobles o si no se queda esperando
una entrada de datos.
`nats req sensors.list ""`
### Suscribirse a un sensor
`nats sub sensors.data.sensor-001`
### Suscribirse a todos los sensores
`nats sub sensors.data.*`
## Consideraciones ## Consideraciones
Hay partes de códigos que son _snippets_ extraídos de una librería de autoría Hay partes de códigos que son _snippets_ extraídos de una librería de autoría
@ -77,8 +16,6 @@ propia. [Repositorio GitHub](https://github.com/zepyrshut/gopher-toolbox). De
las cuales son: las cuales son:
- El _logger_ usando la _stdlib log/slog_. - El _logger_ usando la _stdlib log/slog_.
- La conexión con la base de datos, usando el controlador [pgx](https://github.com/jackc/pgx).
## Bitácora ## Bitácora
@ -120,60 +57,3 @@ Para el registro de valores y mantener ambos se ha usado el patrón decorador qu
bajo un mismo _struct_ se incluye las dos implementaciones y se llama a ambas bajo un mismo _struct_ se incluye las dos implementaciones y se llama a ambas
funciones. Desde la capa servicios sólo tiene que llamar al decorador sin saber funciones. Desde la capa servicios sólo tiene que llamar al decorador sin saber
los detalles de la implementación. los detalles de la implementación.
### Continuamos con los servicios
Con el repositorio sin implementar, se puede realizar los servicios. En ese
proyecto ha quedado muy básico, sirviendo solamente de enlace entre los
controladores y el repositorio.
Cuando se creó el _broker_ de NATS, inicialmente fue para crear una interfaz de
mensajería donde se pudiese manejar _websockets_, _SSE_ y otras mensajerías pero
se consideró que había otras prioridades. Así se quedó.
### Y finalmente los controladores
Ahí tuve muchas dudas con el entendimiento de NATS, estaba muy arraigado en el
patrón REST, y cambiar la mentalidad costó un poco, al final mirando un poco la
documentación me quedé con los conceptos clave:
1. Está basado en asuntos, los canales se crean de forma jerárquica.
2. Cuando se hace un _subscribe_, se suma a un canal del asunto dado.
3. Para escribir en el canal, hay que hacer el _publish_.
4. Finalmente para solicitar un recurso, está el _request_.
Esto es todo, entonces los controladores de la entidad _sensors_ están
constituidos por una serie de _endpoints_ haciendo las acciones que se solicita.
## LLMS
He usado Claude para la toma de decisiones y ayuda con el _boilerplate_, que no
es poca cosa, además también se ha usado para la generación de las pruebas
unitarias, además de resolución de algunos problemas complejos.
## Generadores de código
Existen generadores de código para Golang, de hecho, se fomenta su desarrollo,
hay un artículo interesante de Rob Pike [hablando sobre ello](https://go.dev/blog/generate).
Muchas de las herramientas son muy interesantes usarlas ya que acelera mucho la
generación de código repetitivo. Da mas confianza usar esas herramientas que la
IA.
Para las consultas SQL con seguridad de tipos, existe la herramienta [sqlc](https://sqlc.dev/).
Para ese proyecto sólo se ha usado [GoMock](https://github.com/uber-go/mock),
mantenida por Uber y sirve para usar la interfaz `Repository` sin usar una
base de datos real.
Por otro lado, hubiese sido interesante incorporar [ginkgo](https://onsi.github.io/ginkgo/),
es un marco de trabajo de pruebas unitarias usando un lenguaje de dominio
específico (DSL).
> En lugar de escribir una prueba así: `Test_Validate(t *testing.T) { ... }`
> se puede hacer de la siguiente manera: `var _ Describe("Models", func () { ... })`,
> y dentro del cuerpo se describen las pruebas a realizar.
No se ha incorporado porque hay que instalar la herramienta que ejecutan las
pruebas, y no quería correr el riesgo de que no funcionase en otro equipo o no
diesen los resultados esperados. Que se podría haber usado un contenedor Docker,
sí, pero la prueba no consiste en eso.

3
go.mod
View File

@ -5,7 +5,6 @@ go 1.25.1
require ( require (
github.com/jackc/pgx/v5 v5.7.6 github.com/jackc/pgx/v5 v5.7.6
github.com/nats-io/nats.go v1.46.1 github.com/nats-io/nats.go v1.46.1
go.uber.org/mock v0.6.0
) )
require ( require (
@ -16,7 +15,7 @@ require (
github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.37.0 // indirect golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.16.0 // indirect golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.24.0 // indirect
) )

10
go.sum
View File

@ -22,14 +22,12 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=

View File

@ -1,114 +1 @@
package sensors package sensors
import (
"errors"
"slices"
"time"
)
func (s *Sensor) Validate() error {
if s.SensorID == "" {
return ErrInvalidSensorIdentifier
}
if s.SensorType == "" {
return ErrInvalidSensorType
}
validTypes := []SType{Temperature, Humidity, CarbonDioxide, Pressure, Proximity, Light}
isValid := slices.Contains(validTypes, s.SensorType)
if !isValid {
return ErrInvalidSensorType
}
if s.SamplingInterval == nil {
defaultInterval := time.Second * 3600
s.SamplingInterval = &defaultInterval
}
if s.ThresholdAbove == nil {
defaultAbove := 100.0
s.ThresholdAbove = &defaultAbove
}
if s.ThresholdBelow == nil {
defaultBelow := 0.0
s.ThresholdBelow = &defaultBelow
}
return nil
}
func (d *SensorData) Validate() error {
if d.SensorID == "" {
return ErrInvalidSensorIdentifier
}
if d.Value == nil {
return ErrMissingValue
}
if d.Timestamp == nil {
now := time.Now()
d.Timestamp = &now
}
return nil
}
// TODO: implement this in service layer for alerts
func (d *SensorData) IsOutOfRangeAbove(sensor Sensor) bool {
if d.Value == nil || sensor.ThresholdAbove == nil {
return false
}
return *d.Value > *sensor.ThresholdAbove
}
func (d *SensorData) IsOutOfRangeBelow(sensor Sensor) bool {
if d.Value == nil || sensor.ThresholdBelow == nil {
return false
}
return *d.Value < *sensor.ThresholdBelow
}
func (r *SensorRequest) Validate() error {
if r.SensorID == "" {
return ErrInvalidSensorIdentifier
}
return nil
}
func (r *SensorDataRequest) Validate() error {
if r.SensorID == "" {
return ErrInvalidSensorIdentifier
}
if r.To == nil || *r.To == "" {
defaultTo := time.Now().Format(time.RFC3339)
r.To = &defaultTo
} else {
if _, err := time.Parse(time.RFC3339, *r.To); err != nil {
defaultTo := time.Now().Format(time.RFC3339)
r.To = &defaultTo
}
}
if r.From == nil || *r.From == "" {
defaultFrom := time.Now().AddDate(0, 0, -7).Format(time.RFC3339)
r.From = &defaultFrom
} else {
if _, err := time.Parse(time.RFC3339, *r.From); err != nil {
defaultFrom := time.Now().AddDate(0, 0, -7).Format(time.RFC3339)
r.From = &defaultFrom
}
}
return nil
}
var (
ErrInvalidSensorIdentifier = errors.New("sensor identifier is required")
ErrInvalidSensorType = errors.New("sensor type is required")
ErrSensorNotFound = errors.New("sensor not found")
ErrMissingValue = errors.New("sensor value no provided")
ErrSensorAlreadyExists = errors.New("sensor already exists")
)

View File

@ -1,652 +0,0 @@
package sensors
import (
"testing"
"time"
)
func Test_SensorValidate(t *testing.T) {
type testCase struct {
name string
given Sensor
expected Sensor
expecErr bool
}
tests := []testCase{
{
name: "success with all fields",
given: Sensor{
SensorID: "temp-001",
SensorType: "temperature",
SamplingInterval: ptr(time.Hour * 24),
ThresholdAbove: ptr(50.0),
ThresholdBelow: ptr(-10.0),
},
expected: Sensor{
SensorID: "temp-001",
SensorType: "temperature",
SamplingInterval: ptr(time.Hour * 24),
ThresholdAbove: ptr(50.0),
ThresholdBelow: ptr(-10.0),
},
expecErr: false,
},
{
name: "error when sensor_id is empty",
given: Sensor{
SensorID: "",
SensorType: "temperature",
},
expecErr: true,
},
{
name: "error when sensor_type is empty",
given: Sensor{
SensorID: "temp-001",
SensorType: "",
},
expecErr: true,
},
{
name: "sensor type not in const",
given: Sensor{
SensorID: "temp-001",
SensorType: "unknown",
},
expecErr: true,
},
{
name: "default sampling_interval when nil",
given: Sensor{
SensorID: "temp-002",
SensorType: "humidity",
},
expected: Sensor{
SensorID: "temp-002",
SensorType: "humidity",
SamplingInterval: ptr(time.Second * 3600),
ThresholdAbove: ptr(100.0),
ThresholdBelow: ptr(0.0),
},
expecErr: false,
},
{
name: "default threshold_above when nil",
given: Sensor{
SensorID: "temp-003",
SensorType: "pressure",
SamplingInterval: ptr(time.Minute * 5),
},
expected: Sensor{
SensorID: "temp-003",
SensorType: "pressure",
SamplingInterval: ptr(time.Minute * 5),
ThresholdAbove: ptr(100.0),
ThresholdBelow: ptr(0.0),
},
expecErr: false,
},
{
name: "default threshold_below when nil",
given: Sensor{
SensorID: "temp-004",
SensorType: "light",
SamplingInterval: ptr(time.Second * 30),
ThresholdAbove: ptr(200.0),
},
expected: Sensor{
SensorID: "temp-004",
SensorType: "light",
SamplingInterval: ptr(time.Second * 30),
ThresholdAbove: ptr(200.0),
ThresholdBelow: ptr(0.0),
},
expecErr: false,
},
{
name: "zero values are preserved",
given: Sensor{
SensorID: "temp-005",
SensorType: "temperature",
SamplingInterval: ptr(time.Second * 10),
ThresholdAbove: ptr(0.0),
ThresholdBelow: ptr(0.0),
},
expected: Sensor{
SensorID: "temp-005",
SensorType: "temperature",
SamplingInterval: ptr(time.Second * 10),
ThresholdAbove: ptr(0.0),
ThresholdBelow: ptr(0.0),
},
expecErr: false,
},
{
name: "negative threshold_below is valid",
given: Sensor{
SensorID: "temp-006",
SensorType: "temperature",
SamplingInterval: ptr(time.Minute * 2),
ThresholdAbove: ptr(35.0),
ThresholdBelow: ptr(-20.5),
},
expected: Sensor{
SensorID: "temp-006",
SensorType: "temperature",
SamplingInterval: ptr(time.Minute * 2),
ThresholdAbove: ptr(35.0),
ThresholdBelow: ptr(-20.5),
},
expecErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.given.Validate()
if tt.expecErr && err == nil {
t.Errorf("expected error, got nil")
return
}
if !tt.expecErr && err != nil {
t.Errorf("unexpected error: %v", err)
return
}
if tt.expecErr {
return
}
if tt.given.SensorID != tt.expected.SensorID {
t.Errorf("SensorID: expected %q, got %q", tt.expected.SensorID, tt.given.SensorID)
}
if tt.given.SensorType != tt.expected.SensorType {
t.Errorf("expected %q, got %q", tt.expected.SensorType, tt.given.SensorType)
}
if tt.given.SamplingInterval == nil || tt.expected.SamplingInterval == nil {
if tt.given.SamplingInterval != tt.expected.SamplingInterval {
t.Errorf("expected %v, got %v", tt.expected.SamplingInterval, tt.given.SamplingInterval)
}
} else if *tt.given.SamplingInterval != *tt.expected.SamplingInterval {
t.Errorf("expected %v, got %v", *tt.expected.SamplingInterval, *tt.given.SamplingInterval)
}
if tt.given.ThresholdAbove == nil || tt.expected.ThresholdAbove == nil {
if tt.given.ThresholdAbove != tt.expected.ThresholdAbove {
t.Errorf("expected %v, got %v", tt.expected.ThresholdAbove, tt.given.ThresholdAbove)
}
} else if *tt.given.ThresholdAbove != *tt.expected.ThresholdAbove {
t.Errorf("expected %v, got %v", *tt.expected.ThresholdAbove, *tt.given.ThresholdAbove)
}
if tt.given.ThresholdBelow == nil || tt.expected.ThresholdBelow == nil {
if tt.given.ThresholdBelow != tt.expected.ThresholdBelow {
t.Errorf("expected %v, got %v", tt.expected.ThresholdBelow, tt.given.ThresholdBelow)
}
} else if *tt.given.ThresholdBelow != *tt.expected.ThresholdBelow {
t.Errorf("expected %v, got %v", *tt.expected.ThresholdBelow, *tt.given.ThresholdBelow)
}
})
}
}
func Test_SensorData_IsOutOfRangeAbove(t *testing.T) {
type testCase struct {
name string
data SensorData
sensor Sensor
expected bool
}
tests := []testCase{
{
name: "value above threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(150.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdAbove: ptr(100.0),
},
expected: true,
},
{
name: "value below threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(50.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdAbove: ptr(100.0),
},
expected: false,
},
{
name: "value equal to threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(100.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdAbove: ptr(100.0),
},
expected: false,
},
{
name: "negative value above negative threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(-5.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdAbove: ptr(-10.0),
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.data.IsOutOfRangeAbove(tt.sensor)
if result != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, result)
}
})
}
}
func Test_SensorData_IsOutOfRangeBelow(t *testing.T) {
type testCase struct {
name string
data SensorData
sensor Sensor
expected bool
}
tests := []testCase{
{
name: "value below threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(5.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdBelow: ptr(10.0),
},
expected: true,
},
{
name: "value above threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(50.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdBelow: ptr(10.0),
},
expected: false,
},
{
name: "value equal to threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(10.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdBelow: ptr(10.0),
},
expected: false,
},
{
name: "negative value below threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(-15.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdBelow: ptr(-10.0),
},
expected: true,
},
{
name: "zero value below positive threshold",
data: SensorData{
SensorID: "temp-001",
Value: ptr(0.0),
Timestamp: ptr(time.Now()),
},
sensor: Sensor{
SensorID: "temp-001",
ThresholdBelow: ptr(5.0),
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.data.IsOutOfRangeBelow(tt.sensor)
if result != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, result)
}
})
}
}
func Test_SensorRequest_Validate(t *testing.T) {
type testCase struct {
name string
given SensorRequest
expecErr bool
}
tests := []testCase{
{
name: "valid request with sensor_id",
given: SensorRequest{
SensorID: "temp-001",
},
expecErr: false,
},
{
name: "error when sensor_id is empty",
given: SensorRequest{
SensorID: "",
},
expecErr: true,
},
{
name: "valid request with long sensor_id",
given: SensorRequest{
SensorID: "sensor-with-very-long-identifier-12345",
},
expecErr: false,
},
{
name: "valid request with special characters",
given: SensorRequest{
SensorID: "sensor-001_test",
},
expecErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.given.Validate()
if tt.expecErr && err == nil {
t.Errorf("expected error, got nil")
return
}
if !tt.expecErr && err != nil {
t.Errorf("unexpected error: %v", err)
return
}
})
}
}
func Test_SensorDataRequest_Validate(t *testing.T) {
type testCase struct {
name string
given SensorDataRequest
expecErr bool
checkFn func(t *testing.T, req SensorDataRequest)
}
now := time.Now()
weekAgo := now.AddDate(0, 0, -7)
validFrom := weekAgo.Format(time.RFC3339)
validTo := now.Format(time.RFC3339)
tests := []testCase{
{
name: "valid request with all fields",
given: SensorDataRequest{
SensorID: "temp-001",
From: ptr(validFrom),
To: ptr(validTo),
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.From == nil || *req.From != validFrom {
t.Errorf("expected From to be %q, got %v", validFrom, req.From)
}
if req.To == nil || *req.To != validTo {
t.Errorf("expected To to be %q, got %v", validTo, req.To)
}
},
},
{
name: "error when sensor_id is empty",
given: SensorDataRequest{
SensorID: "",
From: ptr(validFrom),
To: ptr(validTo),
},
expecErr: true,
},
{
name: "default To when nil",
given: SensorDataRequest{
SensorID: "temp-001",
From: ptr(validFrom),
To: nil,
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.To == nil {
t.Error("expected To to be set with default value")
return
}
parsed, err := time.Parse(time.RFC3339, *req.To)
if err != nil {
t.Errorf("expected valid RFC3339 format, got error: %v", err)
}
if time.Since(parsed) > time.Minute {
t.Error("expected To to be approximately now")
}
},
},
{
name: "default To when empty string",
given: SensorDataRequest{
SensorID: "temp-001",
From: ptr(validFrom),
To: ptr(""),
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.To == nil {
t.Error("expected To to be set with default value")
return
}
parsed, err := time.Parse(time.RFC3339, *req.To)
if err != nil {
t.Errorf("expected valid RFC3339 format, got error: %v", err)
}
if time.Since(parsed) > time.Minute {
t.Error("expected To to be approximately now")
}
},
},
{
name: "default From when nil",
given: SensorDataRequest{
SensorID: "temp-001",
From: nil,
To: ptr(validTo),
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.From == nil {
t.Error("expected From to be set with default value")
return
}
parsed, err := time.Parse(time.RFC3339, *req.From)
if err != nil {
t.Errorf("expected valid RFC3339 format, got error: %v", err)
}
expectedFrom := time.Now().AddDate(0, 0, -7)
diff := expectedFrom.Sub(parsed)
if diff > time.Hour || diff < -time.Hour {
t.Errorf("expected From to be approximately 7 days ago, got %v", parsed)
}
},
},
{
name: "default From when empty string",
given: SensorDataRequest{
SensorID: "temp-001",
From: ptr(""),
To: ptr(validTo),
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.From == nil {
t.Error("expected From to be set with default value")
return
}
parsed, err := time.Parse(time.RFC3339, *req.From)
if err != nil {
t.Errorf("expected valid RFC3339 format, got error: %v", err)
}
expectedFrom := time.Now().AddDate(0, 0, -7)
diff := expectedFrom.Sub(parsed)
if diff > time.Hour || diff < -time.Hour {
t.Errorf("expected From to be approximately 7 days ago, got %v", parsed)
}
},
},
{
name: "invalid From format sets default",
given: SensorDataRequest{
SensorID: "temp-001",
From: ptr("invalid-date"),
To: ptr(validTo),
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.From == nil {
t.Error("expected From to be set with default value")
return
}
parsed, err := time.Parse(time.RFC3339, *req.From)
if err != nil {
t.Errorf("expected valid RFC3339 format after correction, got error: %v", err)
}
expectedFrom := time.Now().AddDate(0, 0, -7)
diff := expectedFrom.Sub(parsed)
if diff > time.Hour || diff < -time.Hour {
t.Errorf("expected From to be approximately 7 days ago after correction, got %v", parsed)
}
},
},
{
name: "invalid To format sets default",
given: SensorDataRequest{
SensorID: "temp-001",
From: ptr(validFrom),
To: ptr("not-a-date"),
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.To == nil {
t.Error("expected To to be set with default value")
return
}
parsed, err := time.Parse(time.RFC3339, *req.To)
if err != nil {
t.Errorf("expected valid RFC3339 format after correction, got error: %v", err)
}
if time.Since(parsed) > time.Minute {
t.Error("expected To to be approximately now after correction")
}
},
},
{
name: "all defaults when From and To are nil",
given: SensorDataRequest{
SensorID: "temp-001",
From: nil,
To: nil,
},
expecErr: false,
checkFn: func(t *testing.T, req SensorDataRequest) {
if req.From == nil || req.To == nil {
t.Error("expected both From and To to be set with defaults")
return
}
parsedFrom, err := time.Parse(time.RFC3339, *req.From)
if err != nil {
t.Errorf("expected valid RFC3339 format for From, got error: %v", err)
}
parsedTo, err := time.Parse(time.RFC3339, *req.To)
if err != nil {
t.Errorf("expected valid RFC3339 format for To, got error: %v", err)
}
expectedFrom := time.Now().AddDate(0, 0, -7)
diff := expectedFrom.Sub(parsedFrom)
if diff > time.Hour || diff < -time.Hour {
t.Errorf("expected From to be approximately 7 days ago, got %v", parsedFrom)
}
if time.Since(parsedTo) > time.Minute {
t.Error("expected To to be approximately now")
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.given.Validate()
if tt.expecErr && err == nil {
t.Errorf("expected error, got nil")
return
}
if !tt.expecErr && err != nil {
t.Errorf("unexpected error: %v", err)
return
}
if tt.expecErr {
return
}
if tt.checkFn != nil {
tt.checkFn(t, tt.given)
}
})
}
}
func ptr[T any](v T) *T {
return &v
}

View File

@ -2,9 +2,7 @@ package sensors
import ( import (
"encoding/json" "encoding/json"
"log/slog"
"nats-app/internal/iot" "nats-app/internal/iot"
"time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
@ -15,45 +13,25 @@ const (
subjectSensorsGet = "sensors.get" subjectSensorsGet = "sensors.get"
subjectSensorsValuesGet = "sensors.values.get" subjectSensorsValuesGet = "sensors.values.get"
subjectSensorsList = "sensors.list" subjectSensorsList = "sensors.list"
subjectSensorsData = "sensors.data."
) )
type Handlers struct { type Handlers struct {
service *Service service *Service
*iot.IoTDevice *iot.IoTDevice
simulator *Simulator
} }
func NewHandlers(service *Service, iot *iot.IoTDevice) *Handlers { func NewHandlers(service *Service, iot *iot.IoTDevice) *Handlers {
simulator := Start(iot.NATS)
activeSensors, err := service.repo.ReadAllSensors()
if err != nil {
slog.Error("reading all sensors", "error", err)
}
for _, sensor := range activeSensors {
go simulator.SimulateSensor(sensor)
slog.Info("started simulator for sensor", "sensor_id", sensor.SensorID)
}
sensors, _ := service.repo.ReadAllSensors()
slog.Info("sensors", "sens", sensors)
return &Handlers{ return &Handlers{
service: service, service: service,
IoTDevice: iot, IoTDevice: iot,
simulator: simulator,
} }
} }
func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, error)) { func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, error)) {
var req Req var req Req
if len(msg.Data) > 0 { if err := json.Unmarshal(msg.Data, &req); err != nil {
if err := json.Unmarshal(msg.Data, &req); err != nil { msg.Respond([]byte(`{"error":"invalid request"}`))
msg.Respond([]byte(`{"error":"invalid request"}`)) return
return
}
} }
result, err := handler(req) result, err := handler(req)
@ -66,23 +44,8 @@ func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, erro
msg.Respond(response) msg.Respond(response)
} }
func handlePublish[Req any](msg *nats.Msg, handler func(Req) error) {
var req Req
if len(msg.Data) > 0 {
if err := json.Unmarshal(msg.Data, &req); err != nil {
slog.Error("failed to unmarshal message", "error", err)
return
}
}
if err := handler(req); err != nil {
slog.Error("handler error", "error", err)
}
}
func (h *Handlers) SetupEndpoints() *Handlers { func (h *Handlers) SetupEndpoints() *Handlers {
h.register() h.register()
h.registerData()
h.update() h.update()
h.get() h.get()
h.getValues() h.getValues()
@ -93,55 +56,17 @@ func (h *Handlers) SetupEndpoints() *Handlers {
func (h *Handlers) register() { func (h *Handlers) register() {
h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) { h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) {
handleRequest(msg, func(req Sensor) (Sensor, error) { handleRequest(msg, func(req Sensor) (Sensor, error) {
if err := req.Validate(); err != nil { // service layer
slog.Error("error validating sensor", "error", err)
return Sensor{}, err
}
if err := h.service.RegisterSensor(req); err != nil {
return Sensor{}, err
}
go h.simulator.SimulateSensor(req)
return req, nil return req, nil
}) })
}) })
} }
func (h *Handlers) registerData() {
h.NATS.Subscribe(subjectSensorsData+"*", func(msg *nats.Msg) {
handlePublish(msg, func(data SensorData) error {
if err := data.Validate(); err != nil {
slog.Error("error validating sensor data", "error", err)
return err
}
if err := h.service.RegisterSensorData(data); err != nil {
slog.Error("failed to save sensor data", "error", err, "sensor_id", data.SensorID)
return err
}
slog.Debug("sensor data saved", "sensor_id", data.SensorID, "value", data.Value)
return nil
})
})
}
func (h *Handlers) update() { func (h *Handlers) update() {
h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) { h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) {
handleRequest(msg, func(req Sensor) (Sensor, error) { handleRequest(msg, func(req Sensor) (Sensor, error) {
slog.Debug("calling sensor.update", "payload", req) // service layer
if err := req.Validate(); err != nil {
return Sensor{}, err
}
if err := h.service.UpdateSensor(req); err != nil {
return Sensor{}, err
}
h.simulator.UpdateSensor(req)
return req, nil return req, nil
}) })
@ -150,36 +75,26 @@ func (h *Handlers) update() {
func (h *Handlers) get() { func (h *Handlers) get() {
h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) { h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) {
handleRequest(msg, func(req SensorRequest) (Sensor, error) { handleRequest(msg, func(req struct {
slog.Debug("calling sensor.get", "payload", req) SensorID string `json:"sensor_id"`
}) (Sensor, error) {
// service layer
if err := req.Validate(); err != nil { return Sensor{}, nil
return Sensor{}, err
}
return h.service.GetSensor(req.SensorID)
}) })
}) })
} }
func (h *Handlers) getValues() { func (h *Handlers) getValues() {
h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) { h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) {
handleRequest(msg, func(req SensorDataRequest) ([]SensorData, error) { handleRequest(msg, func(req struct {
if err := req.Validate(); err != nil { SensorID string `json:"sensor_id"`
return []SensorData{}, err From string `json:"from"`
} To string `json:"to"`
}) ([]SensorData, error) {
// service layer
from, err := time.Parse(time.RFC3339, *req.From) return []SensorData{}, nil
if err != nil {
return []SensorData{}, err
}
to, err := time.Parse(time.RFC3339, *req.To)
if err != nil {
return []SensorData{}, err
}
return h.service.GetValues(req.SensorID, from, to)
}) })
}) })
} }
@ -187,7 +102,9 @@ func (h *Handlers) getValues() {
func (h *Handlers) list() { func (h *Handlers) list() {
h.NATS.Subscribe(subjectSensorsList, func(msg *nats.Msg) { h.NATS.Subscribe(subjectSensorsList, func(msg *nats.Msg) {
handleRequest(msg, func(req struct{}) ([]Sensor, error) { handleRequest(msg, func(req struct{}) ([]Sensor, error) {
return h.service.ListSensors() // service layer
return []Sensor{}, nil
}) })
}) })
} }

View File

@ -1,115 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: nats-app/internal/domains/sensors (interfaces: Repository)
//
// Generated by this command:
//
// mockgen -package mock -destination internal/domains/sensors/mock/querier.go nats-app/internal/domains/sensors Repository
//
// Package mock is a generated GoMock package.
package mock
import (
sensors "nats-app/internal/domains/sensors"
reflect "reflect"
time "time"
gomock "go.uber.org/mock/gomock"
)
// MockRepository is a mock of Repository interface.
type MockRepository struct {
ctrl *gomock.Controller
recorder *MockRepositoryMockRecorder
isgomock struct{}
}
// MockRepositoryMockRecorder is the mock recorder for MockRepository.
type MockRepositoryMockRecorder struct {
mock *MockRepository
}
// NewMockRepository creates a new mock instance.
func NewMockRepository(ctrl *gomock.Controller) *MockRepository {
mock := &MockRepository{ctrl: ctrl}
mock.recorder = &MockRepositoryMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockRepository) EXPECT() *MockRepositoryMockRecorder {
return m.recorder
}
// CreateSensor mocks base method.
func (m *MockRepository) CreateSensor(s sensors.Sensor) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateSensor", s)
ret0, _ := ret[0].(error)
return ret0
}
// CreateSensor indicates an expected call of CreateSensor.
func (mr *MockRepositoryMockRecorder) CreateSensor(s any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSensor", reflect.TypeOf((*MockRepository)(nil).CreateSensor), s)
}
// ReadAllSensors mocks base method.
func (m *MockRepository) ReadAllSensors() ([]sensors.Sensor, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadAllSensors")
ret0, _ := ret[0].([]sensors.Sensor)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ReadAllSensors indicates an expected call of ReadAllSensors.
func (mr *MockRepositoryMockRecorder) ReadAllSensors() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadAllSensors", reflect.TypeOf((*MockRepository)(nil).ReadAllSensors))
}
// ReadSensor mocks base method.
func (m *MockRepository) ReadSensor(sensorID string) (sensors.Sensor, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadSensor", sensorID)
ret0, _ := ret[0].(sensors.Sensor)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ReadSensor indicates an expected call of ReadSensor.
func (mr *MockRepositoryMockRecorder) ReadSensor(sensorID any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadSensor", reflect.TypeOf((*MockRepository)(nil).ReadSensor), sensorID)
}
// ReadSensorValues mocks base method.
func (m *MockRepository) ReadSensorValues(sensorID string, from, to time.Time) ([]sensors.SensorData, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ReadSensorValues", sensorID, from, to)
ret0, _ := ret[0].([]sensors.SensorData)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ReadSensorValues indicates an expected call of ReadSensorValues.
func (mr *MockRepositoryMockRecorder) ReadSensorValues(sensorID, from, to any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadSensorValues", reflect.TypeOf((*MockRepository)(nil).ReadSensorValues), sensorID, from, to)
}
// UpdateSensor mocks base method.
func (m *MockRepository) UpdateSensor(s sensors.Sensor) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateSensor", s)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateSensor indicates an expected call of UpdateSensor.
func (mr *MockRepositoryMockRecorder) UpdateSensor(s any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSensor", reflect.TypeOf((*MockRepository)(nil).UpdateSensor), s)
}

View File

@ -15,26 +15,15 @@ const (
) )
type Sensor struct { type Sensor struct {
SensorID string `json:"sensor_id"` SensorID string `json:"sensor_id"`
SensorType SType `json:"sensor_type"` SensorType SType `json:"sensor_type"`
SamplingInterval *time.Duration `json:"sampling"` SamplingInterval time.Duration `json:"sampling"`
ThresholdAbove *float64 `json:"thresoldabove"` ThresholdAbove float64 `json:"thresoldabove"`
ThresholdBelow *float64 `json:"thresoldbelow"` ThresholdBelow float64 `json:"thresoldbelow"`
SensorData *map[int]SensorData `json:"sensor_data,omitempty"` SensorData *[]SensorData `json:"sensor_data,omitempty"`
} }
type SensorData struct { type SensorData struct {
SensorID string `json:"sensor_id"` Value float64 `json:"value"`
Value *float64 `json:"value"` Timestamp time.Time `json:"timestamp"`
Timestamp *time.Time `json:"timestamp"`
}
type SensorRequest struct {
SensorID string `json:"sensor_id"`
}
type SensorDataRequest struct {
SensorID string `json:"sensor_id"`
From *string `json:"from"`
To *string `json:"to"`
} }

View File

@ -1,7 +1,6 @@
package sensors package sensors
import ( import (
"context"
"log/slog" "log/slog"
"sync" "sync"
"time" "time"
@ -10,114 +9,41 @@ import (
) )
type Repository interface { type Repository interface {
CreateSensor(s Sensor) error RegisterSensor(s Sensor) error
CreateSensorData(data SensorData) error UpdateSensorConfig(s Sensor) error
UpdateSensor(s Sensor) error ReadSensor(id int) (Sensor, error)
ReadSensor(sensorID string) (Sensor, error) ReadSensorValues(id int, from, to time.Time) ([]SensorData, error)
ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error)
ReadAllSensors() ([]Sensor, error) ReadAllSensors() ([]Sensor, error)
} }
type pgxRepo struct { type pgxRepo struct {
*pgxpool.Pool pool *pgxpool.Pool
} }
func newPGXRepo(pool *pgxpool.Pool) Repository { func newPGXRepo(pool *pgxpool.Pool) Repository {
return &pgxRepo{ return &pgxRepo{
pool, pool: pool,
} }
} }
const createSensorQuery = `insert into sensors (sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below) values ($1, $2, $3, $4, $5)` func (p *pgxRepo) ReadSensor(id int) (Sensor, error) {
panic("unimplemented")
func (p *pgxRepo) CreateSensor(s Sensor) error {
_, err := p.Exec(context.Background(), createSensorQuery, s.SensorID, string(s.SensorType), s.SamplingInterval, s.ThresholdAbove, s.ThresholdBelow)
return err
} }
const createSensorDataQuery = `insert into registry (sensor_id, value, created_at) values ($1, $2, $3)` func (p *pgxRepo) UpdateSensorConfig(s Sensor) error {
panic("unimplemented")
func (p *pgxRepo) CreateSensorData(s SensorData) error {
_, err := p.Exec(context.Background(), createSensorDataQuery, s.SensorID, s.Value, s.Timestamp)
return err
} }
const updateSensorQuery = `update sensors set sensor_type = $1, sampling_interval = $2, threshold_above = $3, threshold_below = $4 where sensor_id = $5` func (p *pgxRepo) RegisterSensor(s Sensor) error {
panic("unimplemented")
func (p *pgxRepo) UpdateSensor(s Sensor) error {
_, err := p.Exec(context.Background(), updateSensorQuery, string(s.SensorType), s.SamplingInterval, s.ThresholdAbove, s.ThresholdBelow, s.SensorID)
return err
} }
const readSensorBySensorID = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors where sensor_id = $1` func (p *pgxRepo) ReadSensorValues(id int, from time.Time, to time.Time) ([]SensorData, error) {
panic("unimplemented")
func (p *pgxRepo) ReadSensor(sensorID string) (Sensor, error) {
var s Sensor
err := p.QueryRow(context.Background(), readSensorBySensorID, sensorID).Scan(
&s.SensorID,
&s.SensorType,
&s.SamplingInterval,
&s.ThresholdAbove,
&s.ThresholdBelow,
)
if err != nil {
return Sensor{}, ErrSensorNotFound
}
return s, nil
} }
const readSensorValuesBySensorID = `select sensor_id, value, created_at from registry where sensor_id = $1 and created_at >= $2 and created_at <= $3 order by created_at desc`
func (p *pgxRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) {
rows, err := p.Query(context.Background(), readSensorValuesBySensorID, sensorID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
data := []SensorData{}
for rows.Next() {
var sd SensorData
if err := rows.Scan(&sd.SensorID, &sd.Value, &sd.Timestamp); err != nil {
return nil, err
}
data = append(data, sd)
}
if err := rows.Err(); err != nil {
return nil, err
}
return data, nil
}
const readAllSensorsQuery = `select sensor_id, sensor_type, sampling_interval, threshold_above, threshold_below from sensors order by created_at desc`
func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) { func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) {
rows, err := p.Query(context.Background(), readAllSensorsQuery) panic("unimplemented")
if err != nil {
return nil, err
}
defer rows.Close()
sensors := []Sensor{}
for rows.Next() {
var s Sensor
if err := rows.Scan(
&s.SensorID,
&s.SensorType,
&s.SamplingInterval,
&s.ThresholdAbove,
&s.ThresholdBelow,
); err != nil {
return nil, err
}
sensors = append(sensors, s)
}
if err := rows.Err(); err != nil {
return nil, err
}
return sensors, nil
} }
type inMemory struct { type inMemory struct {
@ -128,116 +54,29 @@ type inMemory struct {
func newInMemoryRepo() Repository { func newInMemoryRepo() Repository {
return &inMemory{ return &inMemory{
sensors: make(map[string]*Sensor), sensors: make(map[string]*Sensor),
mu: &sync.Mutex{},
} }
} }
func (i *inMemory) CreateSensor(s Sensor) error { func (i *inMemory) RegisterSensor(s Sensor) error {
i.mu.Lock() panic("unimplemented")
defer i.mu.Unlock()
if _, exists := i.sensors[s.SensorID]; exists {
return ErrSensorAlreadyExists
}
sensorCopy := s
i.sensors[s.SensorID] = &sensorCopy
return nil
} }
func (i *inMemory) CreateSensorData(data SensorData) error { func (i *inMemory) UpdateSensorConfig(s Sensor) error {
i.mu.Lock() panic("unimplemented")
defer i.mu.Unlock()
sensor, exists := i.sensors[data.SensorID]
if !exists {
return ErrSensorNotFound
}
if sensor.SensorData == nil {
sensor.SensorData = &map[int]SensorData{}
}
key := int(data.Timestamp.Unix())
(*sensor.SensorData)[key] = data
if len(*sensor.SensorData) > 100 {
oldestKey := key
for k := range *sensor.SensorData {
if k < oldestKey {
oldestKey = k
}
}
delete(*sensor.SensorData, oldestKey)
}
return nil
} }
func (i *inMemory) UpdateSensor(s Sensor) error { func (i *inMemory) ReadSensor(id int) (Sensor, error) {
i.mu.Lock() panic("unimplemented")
defer i.mu.Unlock()
sensor, exists := i.sensors[s.SensorID]
if !exists {
return ErrSensorNotFound
}
sensor.SensorType = s.SensorType
sensor.SamplingInterval = s.SamplingInterval
sensor.ThresholdAbove = s.ThresholdAbove
sensor.ThresholdBelow = s.ThresholdBelow
return nil
} }
func (i *inMemory) ReadSensor(sensorID string) (Sensor, error) { func (i *inMemory) ReadSensorValues(id int, from time.Time, to time.Time) ([]SensorData, error) {
i.mu.Lock() // holds only last 100 values for every sensor
defer i.mu.Unlock()
sensor, exists := i.sensors[sensorID] panic("unimplemented")
if !exists {
return Sensor{}, ErrSensorNotFound
}
return *sensor, nil
}
func (i *inMemory) ReadSensorValues(sensorID string, from time.Time, to time.Time) ([]SensorData, error) {
i.mu.Lock()
defer i.mu.Unlock()
sensor, exists := i.sensors[sensorID]
if !exists {
return nil, ErrSensorNotFound
}
if sensor.SensorData == nil {
return []SensorData{}, nil
}
data := []SensorData{}
fromUnix := from.Unix()
toUnix := to.Unix()
for timestamp, sd := range *sensor.SensorData {
if int64(timestamp) >= fromUnix && int64(timestamp) <= toUnix {
data = append(data, sd)
}
}
return data, nil
} }
func (i *inMemory) ReadAllSensors() ([]Sensor, error) { func (i *inMemory) ReadAllSensors() ([]Sensor, error) {
i.mu.Lock() panic("unimplemented")
defer i.mu.Unlock()
sensors := make([]Sensor, 0, len(i.sensors))
for _, s := range i.sensors {
sensors = append(sensors, *s)
}
return sensors, nil
} }
type DecoratorRepo struct { type DecoratorRepo struct {
@ -256,7 +95,7 @@ func NewDecoratorRepo(pool *pgxpool.Pool) Repository {
} }
for _, s := range sensors { for _, s := range sensors {
_ = memory.CreateSensor(s) _ = memory.RegisterSensor(s)
} }
return &DecoratorRepo{ return &DecoratorRepo{
@ -265,78 +104,47 @@ func NewDecoratorRepo(pool *pgxpool.Pool) Repository {
} }
} }
func (d *DecoratorRepo) CreateSensor(s Sensor) error { func (d *DecoratorRepo) RegisterSensor(s Sensor) error {
if err := d.db.CreateSensor(s); err != nil { if err := d.db.RegisterSensor(s); err != nil {
return err return err
} }
_ = d.memory.CreateSensor(s) _ = d.memory.RegisterSensor(s)
return nil return nil
} }
func (d *DecoratorRepo) CreateSensorData(data SensorData) error { func (d *DecoratorRepo) UpdateSensorConfig(s Sensor) error {
if err := d.db.CreateSensorData(data); err != nil { if err := d.db.UpdateSensorConfig(s); err != nil {
return err return err
} }
_ = d.memory.CreateSensorData(data) _ = d.memory.UpdateSensorConfig(s)
return nil return nil
} }
func (d *DecoratorRepo) UpdateSensor(s Sensor) error { func (d *DecoratorRepo) ReadSensor(id int) (Sensor, error) {
if err := d.db.UpdateSensor(s); err != nil { sensor, err := d.memory.ReadSensor(id)
return err
}
_ = d.memory.UpdateSensor(s)
return nil
}
func (d *DecoratorRepo) ReadSensor(sensorID string) (Sensor, error) {
sensor, err := d.memory.ReadSensor(sensorID)
if err == nil { if err == nil {
return sensor, nil return sensor, nil
} }
return d.db.ReadSensor(sensorID) return d.db.ReadSensor(id)
} }
func (d *DecoratorRepo) ReadSensorValues(sensorID string, from, to time.Time) ([]SensorData, error) { func (d *DecoratorRepo) ReadSensorValues(id int, from, to time.Time) ([]SensorData, error) {
values, err := d.memory.ReadSensorValues(sensorID, from, to) values, err := d.memory.ReadSensorValues(id, from, to)
if err != nil || len(values) == 0 { if err == nil && len(values) > 0 {
return d.db.ReadSensorValues(sensorID, from, to) return values, nil
} }
var oldestTimestamp *time.Time return d.db.ReadSensorValues(id, from, to)
for _, v := range values {
if oldestTimestamp == nil || v.Timestamp.Before(*oldestTimestamp) {
oldestTimestamp = v.Timestamp
}
}
if oldestTimestamp != nil && oldestTimestamp.After(from) {
return d.db.ReadSensorValues(sensorID, from, to)
}
return values, nil
} }
func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) { func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) {
var sensors []Sensor
sensors, err := d.memory.ReadAllSensors() sensors, err := d.memory.ReadAllSensors()
if err == nil && len(sensors) > 0 { if err == nil && len(sensors) > 0 {
return sensors, nil return sensors, nil
} }
sensors, err = d.db.ReadAllSensors() return d.db.ReadAllSensors()
if err != nil {
return []Sensor{}, err
}
for _, s := range sensors {
_ = d.memory.CreateSensor(s)
}
return sensors, nil
} }

View File

@ -1,10 +1,5 @@
package sensors package sensors
import (
"log/slog"
"time"
)
type Service struct { type Service struct {
repo Repository repo Repository
} }
@ -14,40 +9,3 @@ func NewService(repo Repository) *Service {
repo: repo, repo: repo,
} }
} }
func (s *Service) RegisterSensor(sensor Sensor) error {
err := s.repo.CreateSensor(sensor)
if err != nil {
slog.Error("error registering sensor", "error", err)
return err
}
return nil
}
func (s *Service) RegisterSensorData(data SensorData) error {
err := s.repo.CreateSensorData(data)
if err != nil {
slog.Error("error registering sensor data")
return err
}
return nil
}
func (s *Service) UpdateSensor(sensor Sensor) error {
return s.repo.UpdateSensor(sensor)
}
func (s *Service) GetSensor(sensorID string) (Sensor, error) {
return s.repo.ReadSensor(sensorID)
}
func (s *Service) GetValues(sensorID string, from, to time.Time) ([]SensorData, error) {
return s.repo.ReadSensorValues(sensorID, from, to)
}
func (s *Service) ListSensors() ([]Sensor, error) {
return s.repo.ReadAllSensors()
}

View File

@ -1,111 +1,3 @@
package sensors package sensors
import ( type Simulator struct{}
"encoding/json"
"log/slog"
"math/rand"
"nats-app/internal/broker"
"sync"
"time"
)
type Simulator struct {
*broker.NATS
stopChannels map[string]chan bool
mu sync.Mutex
}
func Start(nats *broker.NATS) *Simulator {
return &Simulator{
NATS: nats,
stopChannels: make(map[string]chan bool),
}
}
func (s *Simulator) SimulateSensor(sensor Sensor) {
s.mu.Lock()
stopChan := make(chan bool)
s.stopChannels[sensor.SensorID] = stopChan
s.mu.Unlock()
ticker := time.NewTicker(*sensor.SamplingInterval * time.Second)
defer ticker.Stop()
for {
select {
case <-stopChan:
slog.Info("stopping simulator for sensor", "sensor_id", sensor.SensorID)
return
case <-ticker.C:
data := s.generateData(sensor)
if data.SensorID == "" {
slog.Warn("sensor data generation failed", "sensor_id", sensor.SensorID)
continue
}
payload, err := json.Marshal(data)
if err != nil {
slog.Error("failed to marshal sensor data", "error", err, "sensor_id", sensor.SensorID)
continue
}
subject := subjectSensorsData + sensor.SensorID
if err := s.Publish(subject, payload); err != nil {
slog.Error("failed to publish sensor data", "error", err, "subject", subject)
} else {
slog.Debug("sensor data published", "sensor_id", sensor.SensorID, "value", data.Value)
}
}
}
}
func (s *Simulator) UpdateSensor(sensor Sensor) {
s.mu.Lock()
stopChan, exists := s.stopChannels[sensor.SensorID]
s.mu.Unlock()
if exists {
stopChan <- true
s.mu.Lock()
delete(s.stopChannels, sensor.SensorID)
s.mu.Unlock()
}
go s.SimulateSensor(sensor)
slog.Info("simulator updated for sensor", "sensor_id", sensor.SensorID, "new_interval", sensor.SamplingInterval)
}
func (s *Simulator) generateData(sensor Sensor) SensorData {
now := time.Now()
data := SensorData{
SensorID: sensor.SensorID,
Timestamp: &now,
}
if rand.Float64() < 0.05 {
return SensorData{}
}
var value float64
switch sensor.SensorType {
case Temperature:
value = -20 + rand.Float64()*100
case Humidity:
value = 10 + rand.Float64()*90
case CarbonDioxide:
value = 980 + rand.Float64()*60
case Pressure:
value = 950 + rand.Float64()*100
case Proximity:
value = rand.Float64() * 400
case Light:
value = rand.Float64() * 10000
default:
value = rand.Float64() * 100
}
data.Value = &value
return data
}