diff --git a/Makefile b/Makefile index 324d6ac..d6de4e6 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,10 @@ DB_NAME := meteologica # Remove and create a development database. dockerize-db: docker rm -f $(DB_NAME) - docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d postgres:$(PG_VERSION) - sleep 10 + docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d postgres:$(PG_VERSION) + @echo "waiting for db to be ready" + @until docker exec $(DB_NAME) pg_isready -U developer -d $(DB_NAME) > /dev/null 2>&1; do sleep 1; done + @echo "db is ready" make migrateup .PHONY: migrateup diff --git a/service_a/internal/domains/meteo/domain.go b/service_a/internal/domains/meteo/domain.go index 0451df9..9ed3f87 100644 --- a/service_a/internal/domains/meteo/domain.go +++ b/service_a/internal/domains/meteo/domain.go @@ -22,6 +22,7 @@ type RejectedMeteoData struct { } type FileStats struct { + BatchID int `json:"batch_id"` RowsInserted int `json:"rows_inserted"` RowsRejected int `json:"rows_rejected"` ElapsedMS int `json:"elapsed_ms"` diff --git a/service_a/internal/domains/meteo/handlers.go b/service_a/internal/domains/meteo/handlers.go index 66703c9..49d670e 100644 --- a/service_a/internal/domains/meteo/handlers.go +++ b/service_a/internal/domains/meteo/handlers.go @@ -59,6 +59,7 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) { return } fileStats.ElapsedMS = int(time.Since(start).Milliseconds()) + h.UpdateElapsedMS(r.Context(), fileStats.BatchID, fileStats.ElapsedMS) slog.Info("CSV file processed", "filename", header.Filename, diff --git a/service_a/internal/domains/meteo/repository.go b/service_a/internal/domains/meteo/repository.go index 30ca398..5baaf33 100644 --- a/service_a/internal/domains/meteo/repository.go +++ b/service_a/internal/domains/meteo/repository.go @@ -8,8 +8,21 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) +type InsertMeteoDataParams struct { + FileChecksum string + Accepted []MeteoData + Rejected []RejectedMeteoData +} + +type InsertMeteoDataResult struct { + BatchID int + AcceptedCount int + RejectedCount int +} + type Repository interface { - InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error) + InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) + UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error } type pgxRepo struct { @@ -22,33 +35,53 @@ func NewPGXRepo(pool *pgxpool.Pool) Repository { } } -const insertAcceptedMeteoData = `insert into public.meteo_data (location_name, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id` - -func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error) { +func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) { tx, err := pgx.Begin(ctx) if err != nil { - return 0, 0, fmt.Errorf("error starting transaction: %w", err) + return nil, fmt.Errorf("error starting transaction: %w", err) } defer tx.Rollback(ctx) - acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, accepted) + batchID, err := pgx.insertBatch(ctx, tx, params.FileChecksum) if err != nil { - return 0, 0, err + return nil, err } - rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, rejected) + acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, batchID, params.Accepted) if err != nil { - return 0, 0, err + return nil, err + } + + rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, batchID, params.Rejected) + if err != nil { + return nil, err } if err = tx.Commit(ctx); err != nil { - return 0, 0, fmt.Errorf("error committing transaction: %w", err) + return nil, fmt.Errorf("error committing transaction: %w", err) } - return acceptedCount, rejectedCount, nil + return &InsertMeteoDataResult{ + BatchID: batchID, + AcceptedCount: acceptedCount, + RejectedCount: rejectedCount, + }, nil } -func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data []MeteoData) (int, error) { +const insertBatch = `insert into public.ingest_batch (elapsed_ms, file_checksum) values ($1, $2) returning id` + +func (pgx *pgxRepo) insertBatch(ctx context.Context, tx b.Tx, fileChecksum string) (int, error) { + var batchID int + err := tx.QueryRow(ctx, insertBatch, 0, fileChecksum).Scan(&batchID) + if err != nil { + return 0, fmt.Errorf("error inserting batch: %w", err) + } + return batchID, nil +} + +const insertAcceptedMeteoData = `insert into public.meteo_data (batch_id, location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness) values ($1, $2, $3, $4, $5, $6, $7) returning id` + +func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []MeteoData) (int, error) { if len(data) == 0 { return 0, nil } @@ -56,7 +89,7 @@ func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data [ batch := &b.Batch{} for _, d := range data { - batch.Queue(insertAcceptedMeteoData, d.Location, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp) + batch.Queue(insertAcceptedMeteoData, batchID, d.Location, d.Timestamp, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness) } results := tx.SendBatch(ctx, batch) @@ -76,9 +109,9 @@ func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data [ return rowsInserted, nil } -const insertRejectedMeteoData = `insert into public.rejected_data (raw_data, reason) values ($1, $2) returning id` +const insertRejectedMeteoData = `insert into public.rejected_data (batch_id, raw_data, reason) values ($1, $2, $3) returning id` -func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data []RejectedMeteoData) (int, error) { +func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []RejectedMeteoData) (int, error) { if len(data) == 0 { return 0, nil } @@ -86,7 +119,7 @@ func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data [ batch := &b.Batch{} for _, d := range data { - batch.Queue(insertRejectedMeteoData, d.RowValue, d.Reason) + batch.Queue(insertRejectedMeteoData, batchID, d.RowValue, d.Reason) } results := tx.SendBatch(ctx, batch) @@ -105,3 +138,13 @@ func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data [ return rowsInserted, nil } + +const updateBatchElapsedTime = `update public.ingest_batch set elapsed_ms = $1 where id = $2` + +func (pgx *pgxRepo) UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error { + _, err := pgx.Exec(ctx, updateBatchElapsedTime, elapsedMS, batchID) + if err != nil { + return fmt.Errorf("error updating batch elapsed time: %w", err) + } + return nil +} diff --git a/service_a/internal/domains/meteo/service.go b/service_a/internal/domains/meteo/service.go index a5db578..1758519 100644 --- a/service_a/internal/domains/meteo/service.go +++ b/service_a/internal/domains/meteo/service.go @@ -24,11 +24,27 @@ func (s *Service) IngestCSV(ctx context.Context, r io.Reader, fs *FileStats) err return err } - fs.RowsInserted, fs.RowsRejected, err = s.repo.InsertMeteoDataTX(ctx, accepted, rejected) + result, err := s.repo.InsertMeteoDataTX(ctx, InsertMeteoDataParams{ + FileChecksum: fs.FileChecksum, + Accepted: accepted, + Rejected: rejected, + }) if err != nil { slog.Error("error inserting meteo data", "err", err) return err } + fs.BatchID = result.BatchID + fs.RowsInserted = result.AcceptedCount + fs.RowsRejected = result.RejectedCount + + return nil +} + +func (s *Service) UpdateElapsedMS(ctx context.Context, batchID, elapsedMS int) error { + if err := s.repo.UpdateBatchElapsedTime(ctx, batchID, elapsedMS); err != nil { + slog.Error("error updating batch elapsed time", "err", err) + } + return nil } diff --git a/service_a/server/database/migrations/001_data.up.sql b/service_a/server/database/migrations/001_data.up.sql index 5e063b1..5572d92 100644 --- a/service_a/server/database/migrations/001_data.up.sql +++ b/service_a/server/database/migrations/001_data.up.sql @@ -1,16 +1,32 @@ create extension citext; -- noqa +create table public.ingest_batch +( + id serial primary key, + + elapsed_ms int not null default 0, + file_checksum text not null, + + created_at timestamp not null default now() +); + +create index idx_ingest_batch_file_checksum on public.ingest_batch ( + file_checksum +); + create table public.meteo_data ( id serial primary key, + batch_id int not null references public.ingest_batch (id), location_name citext not null, - max_temp numeric(5,2) not null, - min_temp numeric(5,2) not null, - rainfall numeric(5,2) not null, + date_of_register date not null, + max_temp numeric(5, 2) not null, + min_temp numeric(5, 2) not null, + rainfall numeric(5, 2) not null, cloudiness int not null, - created_at date not null default now() + created_at timestamp not null default now() ); create index idx_meteo_data_location_name on public.meteo_data (location_name); @@ -18,6 +34,8 @@ create index idx_meteo_data_location_name on public.meteo_data (location_name); create table public.rejected_data ( id serial primary key, + batch_id int not null references public.ingest_batch (id), + raw_data text not null, reason text default null,