diff --git a/.gitignore b/.gitignore index 7bee8aa..74aac89 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .sqlfluff *.out +apuntes.md diff --git a/service_a/go.mod b/service_a/go.mod index 24622c1..6189b53 100644 --- a/service_a/go.mod +++ b/service_a/go.mod @@ -2,10 +2,21 @@ module servicea go 1.25.2 -require github.com/stretchr/testify v1.11.1 +require ( + github.com/jackc/pgx/v5 v5.7.6 + github.com/stretchr/testify v1.11.1 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sync v0.13.0 // indirect + golang.org/x/text v0.24.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/service_a/go.sum b/service_a/go.sum index c4c1710..6b1b3b9 100644 --- a/service_a/go.sum +++ b/service_a/go.sum @@ -1,10 +1,37 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/service_a/internal/app/db.go b/service_a/internal/app/db.go new file mode 100644 index 0000000..09577e6 --- /dev/null +++ b/service_a/internal/app/db.go @@ -0,0 +1,24 @@ +package app + +import ( + "context" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + _ "github.com/jackc/pgx/v5/stdlib" +) + +func NewPGXPool(datasource string) *pgxpool.Pool { + dbPool, err := pgxpool.New(context.Background(), datasource) + if err != nil { + slog.Error("error connecting to database", "error", err, "datasource", datasource) + panic(err) + } + + if err := dbPool.Ping(context.Background()); err != nil { + slog.Error("error pinging database, maybe incorrect datasource", "error", err, "datasource", datasource) + panic(err) + } + slog.Info("connected to database", "datasource", datasource) + return dbPool +} diff --git a/service_a/internal/domains/meteo/file.go b/service_a/internal/domains/meteo/file.go index 33c317b..1b51446 100644 --- a/service_a/internal/domains/meteo/file.go +++ b/service_a/internal/domains/meteo/file.go @@ -30,14 +30,14 @@ func (mt *MeteoData) validate() error { } type FileIngest interface { - Parse(io io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoData, error) + Parse(io io.Reader) ([]MeteoData, []RejectedMeteoData, error) } type CSV struct{} var _ FileIngest = (*CSV)(nil) -func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoData, error) { +func (c *CSV) Parse(r io.Reader) ([]MeteoData, []RejectedMeteoData, error) { reader := csv.NewReader(r) reader.Comma = ';' reader.TrimLeadingSpace = true @@ -74,7 +74,6 @@ func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoDat meteoData, err := normalize(record) if err != nil { - fs.RowsRejected++ rejectedDataList = append(rejectedDataList, RejectedMeteoData{ RowValue: rowValue, Reason: err.Error(), @@ -83,7 +82,6 @@ func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoDat } if err := meteoData.validate(); err != nil { - fs.RowsRejected++ rejectedDataList = append(rejectedDataList, RejectedMeteoData{ RowValue: rowValue, Reason: err.Error(), @@ -92,7 +90,6 @@ func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoDat } meteoDataList = append(meteoDataList, *meteoData) - fs.RowsInserted++ } return meteoDataList, rejectedDataList, nil diff --git a/service_a/internal/domains/meteo/file_test.go b/service_a/internal/domains/meteo/file_test.go index e754907..7f07ab6 100644 --- a/service_a/internal/domains/meteo/file_test.go +++ b/service_a/internal/domains/meteo/file_test.go @@ -128,7 +128,7 @@ func Test_CSV_ParseFile(t *testing.T) { csvIngest := &meteo.CSV{} fileStats := &meteo.FileStats{} - inserted, rejected, err := csvIngest.Parse(file, fileStats) + inserted, rejected, err := csvIngest.Parse(file) assert.NoError(t, err) assert.Equal(t, tt.expectedInserted, fileStats.RowsInserted) diff --git a/service_a/internal/domains/meteo/handlers.go b/service_a/internal/domains/meteo/handlers.go index 769306e..b71dbf0 100644 --- a/service_a/internal/domains/meteo/handlers.go +++ b/service_a/internal/domains/meteo/handlers.go @@ -13,12 +13,12 @@ import ( ) type Handler struct { - csv *CSV + *Service } -func NewHandler() *Handler { +func NewHandler(service *Service) *Handler { return &Handler{ - csv: &CSV{}, + Service: service, } } @@ -50,8 +50,7 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) { } start := time.Now() - - inserted, rejected, err := h.csv.Parse(bytes.NewReader(content), fileStats) + err = h.Service.IngestCSV(bytes.NewReader(content), fileStats) if err != nil { slog.Error(ErrCannotParseFile.Error(), "filename", header.Filename, @@ -59,7 +58,6 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - fileStats.ElapsedMS = int(time.Since(start).Milliseconds()) slog.Info("CSV file processed", @@ -68,8 +66,6 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) { "rows_rejected", fileStats.RowsRejected, "elapsed_ms", fileStats.ElapsedMS, "file_checksum", fileStats.FileChecksum, - "inserted_data", inserted, - "rejected_data", rejected, ) w.Header().Set("Content-Type", "application/json") diff --git a/service_a/internal/domains/meteo/repository.go b/service_a/internal/domains/meteo/repository.go new file mode 100644 index 0000000..85cb6da --- /dev/null +++ b/service_a/internal/domains/meteo/repository.go @@ -0,0 +1,64 @@ +package meteo + +import ( + "context" + "fmt" + + b "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type Repository interface { + InsertAcceptedMeteoData(data []MeteoData) (int, error) + InsertRejectedMeteoData(data []RejectedMeteoData) (int, error) +} + +type pgxRepo struct { + *pgxpool.Pool +} + +func NewPGXRepo(pool *pgxpool.Pool) Repository { + return &pgxRepo{ + pool, + } +} + +const insertAcceptedMeteoData = `insert into public.meteo_data (location_id, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id` + +func (pgx *pgxRepo) InsertAcceptedMeteoData(data []MeteoData) (int, error) { + // TODO: pass context + // TODO improve transaction + tx, err := pgx.Begin(context.Background()) + if err != nil { + return 0, fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback(context.Background()) + + batch := &b.Batch{} + + for _, d := range data { + batch.Queue(insertAcceptedMeteoData, 1, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp) + } + + results := tx.SendBatch(context.Background(), batch) + + for i := range data { + _, err := results.Exec() + if err != nil { + results.Close() + return 0, fmt.Errorf("error executing batch command %d: %w", i, err) + } + } + + results.Close() + + if err = tx.Commit(context.Background()); err != nil { + return 0, fmt.Errorf("error committing transaction: %w", err) + } + + return 0, nil +} + +func (pgx *pgxRepo) InsertRejectedMeteoData(data []RejectedMeteoData) (int, error) { + return 0, nil +} diff --git a/service_a/internal/domains/meteo/service.go b/service_a/internal/domains/meteo/service.go new file mode 100644 index 0000000..156e5cd --- /dev/null +++ b/service_a/internal/domains/meteo/service.go @@ -0,0 +1,52 @@ +package meteo + +import ( + "io" + "log/slog" +) + +type Service struct { + csv CSV + repo Repository +} + +func NewService(repo Repository) *Service { + return &Service{ + csv: CSV{}, + repo: repo, + } +} + +func (s *Service) IngestCSV(r io.Reader, fs *FileStats) error { + accepted, rejected, err := s.csv.Parse(r) + if err != nil { + return err + } + + // TODO call insertToDB + s.insertAcceptedToDB(accepted, fs) + s.insertRejectedToDB(rejected, fs) + + return nil +} + +func (s *Service) insertAcceptedToDB(data []MeteoData, fs *FileStats) error { + var err error + fs.RowsInserted, err = s.repo.InsertAcceptedMeteoData(data) + if err != nil { + slog.Error("error", "err", err) + return err + } + + return nil +} + +func (s *Service) insertRejectedToDB(data []RejectedMeteoData, fs *FileStats) error { + var err error + fs.RowsRejected, err = s.repo.InsertRejectedMeteoData(data) + if err != nil { + return err + } + + return nil +} diff --git a/service_a/server/database/migrations/001_data.up.sql b/service_a/server/database/migrations/001_data.up.sql index 3934ecb..f8ba382 100644 --- a/service_a/server/database/migrations/001_data.up.sql +++ b/service_a/server/database/migrations/001_data.up.sql @@ -5,7 +5,7 @@ create table public.locations ); -create table public.temp_data +create table public.meteo_data ( id serial primary key, diff --git a/service_a/server/main.go b/service_a/server/main.go index d9a6133..ad8fedb 100644 --- a/service_a/server/main.go +++ b/service_a/server/main.go @@ -3,14 +3,19 @@ package main import ( "log/slog" "net/http" + "servicea/internal/app" "servicea/internal/domains/meteo" "servicea/internal/router" ) func main() { + pool := app.NewPGXPool("postgres://developer:secret@localhost:5432/meteologica?sslmode=disable") + mux := router.SetupRoutes() - meteoHandler := meteo.NewHandler() + meteoRepo := meteo.NewPGXRepo(pool) + meteoService := meteo.NewService(meteoRepo) + meteoHandler := meteo.NewHandler(meteoService) meteo.RegisterRoutes(mux, meteoHandler) slog.Info("server starting on :8080")