boilerplate code

This commit is contained in:
Pedro Pérez 2025-10-09 05:20:06 +02:00
parent a7c436fe4c
commit b51f00e499
17 changed files with 500 additions and 17 deletions

View File

@ -1,5 +1,5 @@
GO ?= go
PG_VERSION := 17.6-alpine3.22
TIMESCALE_VERSION := pg17
MOD_NAME := nats-app
DB_NAME := nats-db
NATS_NAME := nats-sv
@ -9,7 +9,7 @@ NATS_VERSION := 2.12.0-alpine3.22
# Remove and create a development database.
dockerize-db:
docker rm -f $(DB_NAME)
docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d postgres:$(PG_VERSION)
docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d timescale/timescaledb-ha:$(TIMESCALE_VERSION)
.PHONY: dockerize-nats
# Remove and create a NATS server.

View File

@ -33,4 +33,27 @@ lo cual significa que se está suscribiendo al canal `hello`. Y en otra escribir
`nats pub "hello" "Hola mundo!"`, lo cual significa que está escribiendo el
mensaje `Hola mundo!` en el canal `hello`.
![demo de NATS](./assets/nats-demo-1.png)
![demo de NATS](./assets/nats-demo-1.png)
### Creación de la aplicación
La parte fácil es hacer todo el _boilerplate_, hecho en solo _commit_, que no es
lo ideal pero lo dicho, es puro relleno, mucho código ya viene escrito de la
librería `gopher-toolbox`. En este punto el proyecto compila pero hay error en
tiempo de ejecución por la falta de implementaciones en el repositorio.
### Empezando por el repositorio
El almacenamiento de los datos se ha optado por el uso de `TimescaleDB`, una
extensión de `PostgreSQL`, el motivo es porque ya he trabajado mucho con ese
motor y tengo los _drivers_ ya escritos. Tengo entendido que está optimizado
para trabajar con grandes cantidades de datos de series temporales, lo que viene
siendo valores de sensores por ejemplo.
Por otro lado también hay un sistema de caché muy rudimentario, en memoria que
es un mapa de valores.
Para el registro de valores y mantener ambos se ha usado el patrón decorador que
bajo un mismo _struct_ se incluye las dos implementaciones y se llama a ambas
funciones. Desde la capa servicios sólo tiene que llamar al decorador sin saber
los detalles de la implementación.

View File

@ -0,0 +1,37 @@
create type sensor_type as enum (
'temperature',
'humidity',
'carbon_dioxide',
'pressure',
'proximity',
'light'
);
create table sensors
(
sensor_id varchar(255) primary key,
sensor_type sensor_type not null,
sampling_interval int not null default 3600,
threshold_above float not null default 100,
threshold_below float not null default 0,
created_at timestamp not null default now(),
updated_at timestamp not null default now()
);
create index idx_sensors_sensor_id on sensors (sensor_id);
create table registry
(
sensor_id int not null references sensors (id),
value float not null,
created_at timestamp not null default now()
)
with (
timescaledb.hypertable,
timescaledb.partition_column = 'created_at',
timescaledb.segmentby = 'sensor_id'
);

View File

@ -3,17 +3,26 @@ package main
import (
"flag"
"log/slog"
"nats-app/internal/app"
"nats-app/internal/domains/sensors"
"nats-app/internal/iot"
)
func main() {
environment := flag.String("env", "dev", "dev or prod")
flag.Parse()
_ = app.NewApp(*environment)
pool := iot.NewPGXPool("postgres://developer:secret@localhost:5432/nats-db?sslmode=disable")
iotDevice := iot.Start(*environment, "nats://localhost:4222")
repo := sensors.NewDecoratorRepo(pool)
sensorsService := sensors.NewService(repo)
_ = sensors.NewHandlers(sensorsService, iotDevice).SetupEndpoints()
slog.Debug("hello world debug")
slog.Info("Hello world info")
slog.Warn("Hello world warn")
slog.Error("hello world error")
select {}
}

18
go.mod
View File

@ -1,3 +1,21 @@
module nats-app
go 1.25.1
require (
github.com/jackc/pgx/v5 v5.7.6
github.com/nats-io/nats.go v1.46.1
)
require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
)

38
go.sum
View File

@ -0,0 +1,38 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo=
github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,11 +0,0 @@
package app
type App struct {
}
func NewApp(environment string) *App {
startRotativeLogger(environment)
return &App{}
}

22
internal/broker/nats.go Normal file
View File

@ -0,0 +1,22 @@
package broker
import (
"log/slog"
"github.com/nats-io/nats.go"
)
type NATS struct {
*nats.Conn
}
func NewNATS(url string) *NATS {
conn, err := nats.Connect(url)
if err != nil {
slog.Error("cannot stablise a connection to server", "error", err)
}
return &NATS{
Conn: conn,
}
}

View File

@ -0,0 +1 @@
package sensors

View File

@ -0,0 +1,110 @@
package sensors
import (
"encoding/json"
"nats-app/internal/iot"
"github.com/nats-io/nats.go"
)
const (
subjectSensorsRegister = "sensors.register"
subjectSensorsUpdate = "sensors.update"
subjectSensorsGet = "sensors.get"
subjectSensorsValuesGet = "sensors.values.get"
subjectSensorsList = "sensors.list"
)
type Handlers struct {
service *Service
*iot.IoTDevice
}
func NewHandlers(service *Service, iot *iot.IoTDevice) *Handlers {
return &Handlers{
service: service,
IoTDevice: iot,
}
}
func handleRequest[Req any, Res any](msg *nats.Msg, handler func(Req) (Res, error)) {
var req Req
if err := json.Unmarshal(msg.Data, &req); err != nil {
msg.Respond([]byte(`{"error":"invalid request"}`))
return
}
result, err := handler(req)
if err != nil {
msg.Respond([]byte(`{"error":"` + err.Error() + `"}`))
return
}
response, _ := json.Marshal(result)
msg.Respond(response)
}
func (h *Handlers) SetupEndpoints() *Handlers {
h.register()
h.update()
h.get()
h.getValues()
h.list()
return h
}
func (h *Handlers) register() {
h.NATS.Subscribe(subjectSensorsRegister, func(msg *nats.Msg) {
handleRequest(msg, func(req Sensor) (Sensor, error) {
// service layer
return req, nil
})
})
}
func (h *Handlers) update() {
h.NATS.Subscribe(subjectSensorsUpdate, func(msg *nats.Msg) {
handleRequest(msg, func(req Sensor) (Sensor, error) {
// service layer
return req, nil
})
})
}
func (h *Handlers) get() {
h.NATS.Subscribe(subjectSensorsGet, func(msg *nats.Msg) {
handleRequest(msg, func(req struct {
SensorID string `json:"sensor_id"`
}) (Sensor, error) {
// service layer
return Sensor{}, nil
})
})
}
func (h *Handlers) getValues() {
h.NATS.Subscribe(subjectSensorsValuesGet, func(msg *nats.Msg) {
handleRequest(msg, func(req struct {
SensorID string `json:"sensor_id"`
From string `json:"from"`
To string `json:"to"`
}) ([]SensorData, error) {
// service layer
return []SensorData{}, nil
})
})
}
func (h *Handlers) list() {
h.NATS.Subscribe(subjectSensorsList, func(msg *nats.Msg) {
handleRequest(msg, func(req struct{}) ([]Sensor, error) {
// service layer
return []Sensor{}, nil
})
})
}

View File

@ -0,0 +1,29 @@
package sensors
import "time"
type SType string
const (
Temperature SType = "temperature"
Humidity SType = "humidity"
CarbonDioxide SType = "carbon_dioxide"
Pressure SType = "pressure"
Proximity SType = "proximity"
Light SType = "light"
// and more...
)
type Sensor struct {
SensorID string `json:"sensor_id"`
SensorType SType `json:"sensor_type"`
SamplingInterval time.Duration `json:"sampling"`
ThresholdAbove float64 `json:"thresoldabove"`
ThresholdBelow float64 `json:"thresoldbelow"`
SensorData *[]SensorData `json:"sensor_data,omitempty"`
}
type SensorData struct {
Value float64 `json:"value"`
Timestamp time.Time `json:"timestamp"`
}

View File

@ -0,0 +1,150 @@
package sensors
import (
"log/slog"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
RegisterSensor(s Sensor) error
UpdateSensorConfig(s Sensor) error
ReadSensor(id int) (Sensor, error)
ReadSensorValues(id int, from, to time.Time) ([]SensorData, error)
ReadAllSensors() ([]Sensor, error)
}
type pgxRepo struct {
pool *pgxpool.Pool
}
func newPGXRepo(pool *pgxpool.Pool) Repository {
return &pgxRepo{
pool: pool,
}
}
func (p *pgxRepo) ReadSensor(id int) (Sensor, error) {
panic("unimplemented")
}
func (p *pgxRepo) UpdateSensorConfig(s Sensor) error {
panic("unimplemented")
}
func (p *pgxRepo) RegisterSensor(s Sensor) error {
panic("unimplemented")
}
func (p *pgxRepo) ReadSensorValues(id int, from time.Time, to time.Time) ([]SensorData, error) {
panic("unimplemented")
}
func (p *pgxRepo) ReadAllSensors() ([]Sensor, error) {
panic("unimplemented")
}
type inMemory struct {
sensors map[string]*Sensor
mu *sync.Mutex
}
func newInMemoryRepo() Repository {
return &inMemory{
sensors: make(map[string]*Sensor),
}
}
func (i *inMemory) RegisterSensor(s Sensor) error {
panic("unimplemented")
}
func (i *inMemory) UpdateSensorConfig(s Sensor) error {
panic("unimplemented")
}
func (i *inMemory) ReadSensor(id int) (Sensor, error) {
panic("unimplemented")
}
func (i *inMemory) ReadSensorValues(id int, from time.Time, to time.Time) ([]SensorData, error) {
// holds only last 100 values for every sensor
panic("unimplemented")
}
func (i *inMemory) ReadAllSensors() ([]Sensor, error) {
panic("unimplemented")
}
type DecoratorRepo struct {
db Repository
memory Repository
}
func NewDecoratorRepo(pool *pgxpool.Pool) Repository {
db := newPGXRepo(pool)
memory := newInMemoryRepo()
sensors, err := db.ReadAllSensors()
if err != nil {
slog.Error("error warming up cache")
}
for _, s := range sensors {
_ = memory.RegisterSensor(s)
}
return &DecoratorRepo{
db: db,
memory: memory,
}
}
func (d *DecoratorRepo) RegisterSensor(s Sensor) error {
if err := d.db.RegisterSensor(s); err != nil {
return err
}
_ = d.memory.RegisterSensor(s)
return nil
}
func (d *DecoratorRepo) UpdateSensorConfig(s Sensor) error {
if err := d.db.UpdateSensorConfig(s); err != nil {
return err
}
_ = d.memory.UpdateSensorConfig(s)
return nil
}
func (d *DecoratorRepo) ReadSensor(id int) (Sensor, error) {
sensor, err := d.memory.ReadSensor(id)
if err == nil {
return sensor, nil
}
return d.db.ReadSensor(id)
}
func (d *DecoratorRepo) ReadSensorValues(id int, from, to time.Time) ([]SensorData, error) {
values, err := d.memory.ReadSensorValues(id, from, to)
if err == nil && len(values) > 0 {
return values, nil
}
return d.db.ReadSensorValues(id, from, to)
}
func (d *DecoratorRepo) ReadAllSensors() ([]Sensor, error) {
sensors, err := d.memory.ReadAllSensors()
if err == nil && len(sensors) > 0 {
return sensors, nil
}
return d.db.ReadAllSensors()
}

View File

@ -0,0 +1,11 @@
package sensors
type Service struct {
repo Repository
}
func NewService(repo Repository) *Service {
return &Service{
repo: repo,
}
}

View File

@ -0,0 +1,3 @@
package sensors
type Simulator struct{}

24
internal/iot/db.go Normal file
View File

@ -0,0 +1,24 @@
package iot
import (
"context"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/jackc/pgx/v5/stdlib"
)
func NewPGXPool(datasource string) *pgxpool.Pool {
dbPool, err := pgxpool.New(context.Background(), datasource)
if err != nil {
slog.Error("error connecting to database", "error", err, "datasource", datasource)
panic(err)
}
if err := dbPool.Ping(context.Background()); err != nil {
slog.Error("error pinging database, maybe incorrect datasource", "error", err, "datasource", datasource)
panic(err)
}
slog.Info("connected to database", "datasource", datasource)
return dbPool
}

19
internal/iot/iot.go Normal file
View File

@ -0,0 +1,19 @@
package iot
import (
"nats-app/internal/broker"
)
type IoTDevice struct {
NATS *broker.NATS
}
func Start(environment, url string) *IoTDevice {
startRotativeLogger(environment)
nats := broker.NewNATS(url)
return &IoTDevice{
NATS: nats,
}
}

View File

@ -1,4 +1,4 @@
package app
package iot
import (
"fmt"