From b51f00e49972deb83b33aa46497480d160951862 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20P=C3=A9rez?= Date: Thu, 9 Oct 2025 05:20:06 +0200 Subject: [PATCH] boilerplate code --- Makefile | 4 +- README.md | 25 ++++- app/database/001_sensors.up.sql | 37 ++++++ app/main.go | 13 ++- go.mod | 18 +++ go.sum | 38 +++++++ internal/app/app.go | 11 -- internal/broker/nats.go | 22 ++++ internal/domains/sensors/domain.go | 1 + internal/domains/sensors/handlers.go | 110 ++++++++++++++++++ internal/domains/sensors/models.go | 29 +++++ internal/domains/sensors/repository.go | 150 +++++++++++++++++++++++++ internal/domains/sensors/service.go | 11 ++ internal/domains/sensors/simulator.go | 3 + internal/iot/db.go | 24 ++++ internal/iot/iot.go | 19 ++++ internal/{app => iot}/logger.go | 2 +- 17 files changed, 500 insertions(+), 17 deletions(-) create mode 100644 app/database/001_sensors.up.sql delete mode 100644 internal/app/app.go create mode 100644 internal/broker/nats.go create mode 100644 internal/domains/sensors/domain.go create mode 100644 internal/domains/sensors/handlers.go create mode 100644 internal/domains/sensors/models.go create mode 100644 internal/domains/sensors/repository.go create mode 100644 internal/domains/sensors/service.go create mode 100644 internal/domains/sensors/simulator.go create mode 100644 internal/iot/db.go create mode 100644 internal/iot/iot.go rename internal/{app => iot}/logger.go (98%) diff --git a/Makefile b/Makefile index 8765f88..6986569 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ GO ?= go -PG_VERSION := 17.6-alpine3.22 +TIMESCALE_VERSION := pg17 MOD_NAME := nats-app DB_NAME := nats-db NATS_NAME := nats-sv @@ -9,7 +9,7 @@ NATS_VERSION := 2.12.0-alpine3.22 # Remove and create a development database. dockerize-db: docker rm -f $(DB_NAME) - docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d postgres:$(PG_VERSION) + docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d timescale/timescaledb-ha:$(TIMESCALE_VERSION) .PHONY: dockerize-nats # Remove and create a NATS server. diff --git a/README.md b/README.md index 360f79e..6f415d1 100644 --- a/README.md +++ b/README.md @@ -33,4 +33,27 @@ lo cual significa que se está suscribiendo al canal `hello`. Y en otra escribir `nats pub "hello" "Hola mundo!"`, lo cual significa que está escribiendo el mensaje `Hola mundo!` en el canal `hello`. -![demo de NATS](./assets/nats-demo-1.png) \ No newline at end of file +![demo de NATS](./assets/nats-demo-1.png) + +### Creación de la aplicación + +La parte fácil es hacer todo el _boilerplate_, hecho en solo _commit_, que no es +lo ideal pero lo dicho, es puro relleno, mucho código ya viene escrito de la +librería `gopher-toolbox`. En este punto el proyecto compila pero hay error en +tiempo de ejecución por la falta de implementaciones en el repositorio. + +### Empezando por el repositorio + +El almacenamiento de los datos se ha optado por el uso de `TimescaleDB`, una +extensión de `PostgreSQL`, el motivo es porque ya he trabajado mucho con ese +motor y tengo los _drivers_ ya escritos. Tengo entendido que está optimizado +para trabajar con grandes cantidades de datos de series temporales, lo que viene +siendo valores de sensores por ejemplo. + +Por otro lado también hay un sistema de caché muy rudimentario, en memoria que +es un mapa de valores. + +Para el registro de valores y mantener ambos se ha usado el patrón decorador que +bajo un mismo _struct_ se incluye las dos implementaciones y se llama a ambas +funciones. Desde la capa servicios sólo tiene que llamar al decorador sin saber +los detalles de la implementación. diff --git a/app/database/001_sensors.up.sql b/app/database/001_sensors.up.sql new file mode 100644 index 0000000..f4a4203 --- /dev/null +++ b/app/database/001_sensors.up.sql @@ -0,0 +1,37 @@ +create type sensor_type as enum ( + 'temperature', + 'humidity', + 'carbon_dioxide', + 'pressure', + 'proximity', + 'light' + ); + +create table sensors +( + sensor_id varchar(255) primary key, + + sensor_type sensor_type not null, + sampling_interval int not null default 3600, + + threshold_above float not null default 100, + threshold_below float not null default 0, + + created_at timestamp not null default now(), + updated_at timestamp not null default now() +); + +create index idx_sensors_sensor_id on sensors (sensor_id); + +create table registry +( + sensor_id int not null references sensors (id), + + value float not null, + created_at timestamp not null default now() +) + with ( + timescaledb.hypertable, + timescaledb.partition_column = 'created_at', + timescaledb.segmentby = 'sensor_id' + ); \ No newline at end of file diff --git a/app/main.go b/app/main.go index bd2b9b5..d5388a7 100644 --- a/app/main.go +++ b/app/main.go @@ -3,17 +3,26 @@ package main import ( "flag" "log/slog" - "nats-app/internal/app" + "nats-app/internal/domains/sensors" + "nats-app/internal/iot" ) func main() { environment := flag.String("env", "dev", "dev or prod") flag.Parse() - _ = app.NewApp(*environment) + pool := iot.NewPGXPool("postgres://developer:secret@localhost:5432/nats-db?sslmode=disable") + + iotDevice := iot.Start(*environment, "nats://localhost:4222") + + repo := sensors.NewDecoratorRepo(pool) + sensorsService := sensors.NewService(repo) + _ = sensors.NewHandlers(sensorsService, iotDevice).SetupEndpoints() slog.Debug("hello world debug") slog.Info("Hello world info") slog.Warn("Hello world warn") slog.Error("hello world error") + + select {} } diff --git a/go.mod b/go.mod index fe4a28d..2edd904 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,21 @@ module nats-app go 1.25.1 + +require ( + github.com/jackc/pgx/v5 v5.7.6 + github.com/nats-io/nats.go v1.46.1 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sync v0.13.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect +) diff --git a/go.sum b/go.sum index e69de29..55928ca 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo= +github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/app/app.go b/internal/app/app.go deleted file mode 100644 index 7ca991b..0000000 --- a/internal/app/app.go +++ /dev/null @@ -1,11 +0,0 @@ -package app - -type App struct { -} - -func NewApp(environment string) *App { - - startRotativeLogger(environment) - - return &App{} -} diff --git a/internal/broker/nats.go b/internal/broker/nats.go new file mode 100644 index 0000000..ab3dec5 --- /dev/null +++ b/internal/broker/nats.go @@ -0,0 +1,22 @@ +package broker + +import ( + "log/slog" + + "github.com/nats-io/nats.go" +) + +type NATS struct { + *nats.Conn +} + +func NewNATS(url string) *NATS { + conn, err := nats.Connect(url) + if err != nil { + slog.Error("cannot stablise a connection to server", "error", err) + } + + return &NATS{ + Conn: conn, + } +} diff --git a/internal/domains/sensors/domain.go b/internal/domains/sensors/domain.go new file mode 100644 index 0000000..3b3fa2b --- /dev/null +++ b/internal/domains/sensors/domain.go @@ -0,0 +1 @@ +package sensors diff --git a/internal/domains/sensors/handlers.go b/internal/domains/sensors/handlers.go new file mode 100644 index 0000000..f160525 --- /dev/null +++ b/internal/domains/sensors/handlers.go @@ -0,0 +1,110 @@ +package sensors + +import ( + "encoding/json" + "nats-app/internal/iot" + + "github.com/nats-io/nats.go" +) + +const ( + subjectSensorsRegister = "sensors.register" + subjectSensorsUpdate = "sensors.update" + subjectSensorsGet = "sensors.get" + subjectSensorsValuesGet = "sensors.values.get" + subjectSensorsList = "sensors.list" +) + +type Handlers struct { + service *Service + *iot.IoTDevice +} + +func NewHandlers(service *Service, iot *iot.IoTDevice) *Handlers { + return &Handlers{ + service: service, + IoTDevice: iot, + } +} + +func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, error)) { + var req Req + if err := json.Unmarshal(msg.Data, &req); err != nil { + msg.Respond([]byte(`{"error":"invalid request"}`)) + return + } + + result, err := handler(req) + if err != nil { + msg.Respond([]byte(`{"error":"` + err.Error() + `"}`)) + return + } + + response, _ := json.Marshal(result) + msg.Respond(response) +} + +func (h *Handlers) SetupEndpoints() *Handlers { + h.register() + h.update() + h.get() + h.getValues() + h.list() + return h +} + +func (h *Handlers) register() { + h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) { + handleRequest(msg, func(req Sensor) (Sensor, error) { + // service layer + + return req, nil + }) + }) +} + +func (h *Handlers) update() { + h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) { + handleRequest(msg, func(req Sensor) (Sensor, error) { + // service layer + + return req, nil + }) + }) +} + +func (h *Handlers) get() { + h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) { + handleRequest(msg, func(req struct { + SensorID string `json:"sensor_id"` + }) (Sensor, error) { + // service layer + + return Sensor{}, nil + }) + }) +} + +func (h *Handlers) getValues() { + h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) { + handleRequest(msg, func(req struct { + SensorID string `json:"sensor_id"` + From string `json:"from"` + To string `json:"to"` + }) ([]SensorData, error) { + // service layer + + return []SensorData{}, nil + }) + }) +} + +func (h *Handlers) list() { + h.NATS.Subscribe(subjectSensorsList, func(msg *nats.Msg) { + handleRequest(msg, func(req struct{}) ([]Sensor, error) { + // service layer + + return []Sensor{}, nil + }) + }) +} diff --git a/internal/domains/sensors/models.go b/internal/domains/sensors/models.go new file mode 100644 index 0000000..daabde3 --- /dev/null +++ b/internal/domains/sensors/models.go @@ -0,0 +1,29 @@ +package sensors + +import "time" + +type SType string + +const ( + Temperature SType = "temperature" + Humidity SType = "humidity" + CarbonDioxide SType = "carbon_dioxide" + Pressure SType = "pressure" + Proximity SType = "proximity" + Light SType = "light" + // and more... +) + +type Sensor struct { + SensorID string `json:"sensor_id"` + SensorType SType `json:"sensor_type"` + SamplingInterval time.Duration `json:"sampling"` + ThresholdAbove float64 `json:"thresoldabove"` + ThresholdBelow float64 `json:"thresoldbelow"` + SensorData *[]SensorData `json:"sensor_data,omitempty"` +} + +type SensorData struct { + Value float64 `json:"value"` + Timestamp time.Time `json:"timestamp"` +} diff --git a/internal/domains/sensors/repository.go b/internal/domains/sensors/repository.go new file mode 100644 index 0000000..4e10196 --- /dev/null +++ b/internal/domains/sensors/repository.go @@ -0,0 +1,150 @@ +package sensors + +import ( + "log/slog" + "sync" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type Repository interface { + RegisterSensor(s Sensor) error + UpdateSensorConfig(s Sensor) error + ReadSensor(id int) (Sensor, error) + ReadSensorValues(id int, from, to time.Time) ([]SensorData, error) + ReadAllSensors() ([]Sensor, error) +} + +type pgxRepo struct { + pool *pgxpool.Pool +} + +func newPGXRepo(pool *pgxpool.Pool) Repository { + return &pgxRepo{ + pool: pool, + } +} + +func (p *pgxRepo) ReadSensor(id int) (Sensor, error) { + panic("unimplemented") +} + +func (p *pgxRepo) UpdateSensorConfig(s Sensor) error { + panic("unimplemented") +} + +func (p *pgxRepo) RegisterSensor(s Sensor) error { + panic("unimplemented") +} + +func (p *pgxRepo) ReadSensorValues(id int, from time.Time, to time.Time) ([]SensorData, error) { + panic("unimplemented") +} + +func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) { + panic("unimplemented") +} + +type inMemory struct { + sensors map[string]*Sensor + mu *sync.Mutex +} + +func newInMemoryRepo() Repository { + return &inMemory{ + sensors: make(map[string]*Sensor), + } +} + +func (i *inMemory) RegisterSensor(s Sensor) error { + panic("unimplemented") +} + +func (i *inMemory) UpdateSensorConfig(s Sensor) error { + panic("unimplemented") +} + +func (i *inMemory) ReadSensor(id int) (Sensor, error) { + panic("unimplemented") +} + +func (i *inMemory) ReadSensorValues(id int, from time.Time, to time.Time) ([]SensorData, error) { + // holds only last 100 values for every sensor + + panic("unimplemented") +} + +func (i *inMemory) ReadAllSensors() ([]Sensor, error) { + panic("unimplemented") +} + +type DecoratorRepo struct { + db Repository + memory Repository +} + +func NewDecoratorRepo(pool *pgxpool.Pool) Repository { + + db := newPGXRepo(pool) + memory := newInMemoryRepo() + + sensors, err := db.ReadAllSensors() + if err != nil { + slog.Error("error warming up cache") + } + + for _, s := range sensors { + _ = memory.RegisterSensor(s) + } + + return &DecoratorRepo{ + db: db, + memory: memory, + } +} + +func (d *DecoratorRepo) RegisterSensor(s Sensor) error { + if err := d.db.RegisterSensor(s); err != nil { + return err + } + + _ = d.memory.RegisterSensor(s) + return nil +} + +func (d *DecoratorRepo) UpdateSensorConfig(s Sensor) error { + if err := d.db.UpdateSensorConfig(s); err != nil { + return err + } + + _ = d.memory.UpdateSensorConfig(s) + return nil +} + +func (d *DecoratorRepo) ReadSensor(id int) (Sensor, error) { + sensor, err := d.memory.ReadSensor(id) + if err == nil { + return sensor, nil + } + + return d.db.ReadSensor(id) +} + +func (d *DecoratorRepo) ReadSensorValues(id int, from, to time.Time) ([]SensorData, error) { + values, err := d.memory.ReadSensorValues(id, from, to) + if err == nil && len(values) > 0 { + return values, nil + } + + return d.db.ReadSensorValues(id, from, to) +} + +func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) { + sensors, err := d.memory.ReadAllSensors() + if err == nil && len(sensors) > 0 { + return sensors, nil + } + + return d.db.ReadAllSensors() +} diff --git a/internal/domains/sensors/service.go b/internal/domains/sensors/service.go new file mode 100644 index 0000000..a8edb14 --- /dev/null +++ b/internal/domains/sensors/service.go @@ -0,0 +1,11 @@ +package sensors + +type Service struct { + repo Repository +} + +func NewService(repo Repository) *Service { + return &Service{ + repo: repo, + } +} diff --git a/internal/domains/sensors/simulator.go b/internal/domains/sensors/simulator.go new file mode 100644 index 0000000..76dbfc2 --- /dev/null +++ b/internal/domains/sensors/simulator.go @@ -0,0 +1,3 @@ +package sensors + +type Simulator struct{} diff --git a/internal/iot/db.go b/internal/iot/db.go new file mode 100644 index 0000000..14da8e4 --- /dev/null +++ b/internal/iot/db.go @@ -0,0 +1,24 @@ +package iot + +import ( + "context" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + _ "github.com/jackc/pgx/v5/stdlib" +) + +func NewPGXPool(datasource string) *pgxpool.Pool { + dbPool, err := pgxpool.New(context.Background(), datasource) + if err != nil { + slog.Error("error connecting to database", "error", err, "datasource", datasource) + panic(err) + } + + if err := dbPool.Ping(context.Background()); err != nil { + slog.Error("error pinging database, maybe incorrect datasource", "error", err, "datasource", datasource) + panic(err) + } + slog.Info("connected to database", "datasource", datasource) + return dbPool +} diff --git a/internal/iot/iot.go b/internal/iot/iot.go new file mode 100644 index 0000000..b331ada --- /dev/null +++ b/internal/iot/iot.go @@ -0,0 +1,19 @@ +package iot + +import ( + "nats-app/internal/broker" +) + +type IoTDevice struct { + NATS *broker.NATS +} + +func Start(environment, url string) *IoTDevice { + + startRotativeLogger(environment) + nats := broker.NewNATS(url) + + return &IoTDevice{ + NATS: nats, + } +} diff --git a/internal/app/logger.go b/internal/iot/logger.go similarity index 98% rename from internal/app/logger.go rename to internal/iot/logger.go index de4def4..6ff0be7 100644 --- a/internal/app/logger.go +++ b/internal/iot/logger.go @@ -1,4 +1,4 @@ -package app +package iot import ( "fmt"