refactor to add ingest batchs for better traceability

This commit is contained in:
Pedro Pérez 2025-10-23 18:11:26 +02:00
parent 1f0eb18a5f
commit fb4d31afba
6 changed files with 104 additions and 23 deletions

View File

@ -7,8 +7,10 @@ DB_NAME := meteologica
# Remove and create a development database.
dockerize-db:
docker rm -f $(DB_NAME)
docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d postgres:$(PG_VERSION)
sleep 10
docker run --name $(DB_NAME) -e POSTGRES_PASSWORD=secret -e POSTGRES_USER=developer -e POSTGRES_DB=$(DB_NAME) -p 5432:5432 -d postgres:$(PG_VERSION)
@echo "waiting for db to be ready"
@until docker exec $(DB_NAME) pg_isready -U developer -d $(DB_NAME) > /dev/null 2>&1; do sleep 1; done
@echo "db is ready"
make migrateup
.PHONY: migrateup

View File

@ -22,6 +22,7 @@ type RejectedMeteoData struct {
}
type FileStats struct {
BatchID int `json:"batch_id"`
RowsInserted int `json:"rows_inserted"`
RowsRejected int `json:"rows_rejected"`
ElapsedMS int `json:"elapsed_ms"`

View File

@ -59,6 +59,7 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) {
return
}
fileStats.ElapsedMS = int(time.Since(start).Milliseconds())
h.UpdateElapsedMS(r.Context(), fileStats.BatchID, fileStats.ElapsedMS)
slog.Info("CSV file processed",
"filename", header.Filename,

View File

@ -8,8 +8,21 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)
type InsertMeteoDataParams struct {
FileChecksum string
Accepted []MeteoData
Rejected []RejectedMeteoData
}
type InsertMeteoDataResult struct {
BatchID int
AcceptedCount int
RejectedCount int
}
type Repository interface {
InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error)
InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error)
UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error
}
type pgxRepo struct {
@ -22,33 +35,53 @@ func NewPGXRepo(pool *pgxpool.Pool) Repository {
}
}
const insertAcceptedMeteoData = `insert into public.meteo_data (location_name, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id`
func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error) {
func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) {
tx, err := pgx.Begin(ctx)
if err != nil {
return 0, 0, fmt.Errorf("error starting transaction: %w", err)
return nil, fmt.Errorf("error starting transaction: %w", err)
}
defer tx.Rollback(ctx)
acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, accepted)
batchID, err := pgx.insertBatch(ctx, tx, params.FileChecksum)
if err != nil {
return 0, 0, err
return nil, err
}
rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, rejected)
acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, batchID, params.Accepted)
if err != nil {
return 0, 0, err
return nil, err
}
rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, batchID, params.Rejected)
if err != nil {
return nil, err
}
if err = tx.Commit(ctx); err != nil {
return 0, 0, fmt.Errorf("error committing transaction: %w", err)
return nil, fmt.Errorf("error committing transaction: %w", err)
}
return acceptedCount, rejectedCount, nil
return &InsertMeteoDataResult{
BatchID: batchID,
AcceptedCount: acceptedCount,
RejectedCount: rejectedCount,
}, nil
}
func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data []MeteoData) (int, error) {
const insertBatch = `insert into public.ingest_batch (elapsed_ms, file_checksum) values ($1, $2) returning id`
func (pgx *pgxRepo) insertBatch(ctx context.Context, tx b.Tx, fileChecksum string) (int, error) {
var batchID int
err := tx.QueryRow(ctx, insertBatch, 0, fileChecksum).Scan(&batchID)
if err != nil {
return 0, fmt.Errorf("error inserting batch: %w", err)
}
return batchID, nil
}
const insertAcceptedMeteoData = `insert into public.meteo_data (batch_id, location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness) values ($1, $2, $3, $4, $5, $6, $7) returning id`
func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []MeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
@ -56,7 +89,7 @@ func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data [
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertAcceptedMeteoData, d.Location, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp)
batch.Queue(insertAcceptedMeteoData, batchID, d.Location, d.Timestamp, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness)
}
results := tx.SendBatch(ctx, batch)
@ -76,9 +109,9 @@ func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data [
return rowsInserted, nil
}
const insertRejectedMeteoData = `insert into public.rejected_data (raw_data, reason) values ($1, $2) returning id`
const insertRejectedMeteoData = `insert into public.rejected_data (batch_id, raw_data, reason) values ($1, $2, $3) returning id`
func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data []RejectedMeteoData) (int, error) {
func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []RejectedMeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
@ -86,7 +119,7 @@ func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data [
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertRejectedMeteoData, d.RowValue, d.Reason)
batch.Queue(insertRejectedMeteoData, batchID, d.RowValue, d.Reason)
}
results := tx.SendBatch(ctx, batch)
@ -105,3 +138,13 @@ func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data [
return rowsInserted, nil
}
const updateBatchElapsedTime = `update public.ingest_batch set elapsed_ms = $1 where id = $2`
func (pgx *pgxRepo) UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error {
_, err := pgx.Exec(ctx, updateBatchElapsedTime, elapsedMS, batchID)
if err != nil {
return fmt.Errorf("error updating batch elapsed time: %w", err)
}
return nil
}

View File

@ -24,11 +24,27 @@ func (s *Service) IngestCSV(ctx context.Context, r io.Reader, fs *FileStats) err
return err
}
fs.RowsInserted, fs.RowsRejected, err = s.repo.InsertMeteoDataTX(ctx, accepted, rejected)
result, err := s.repo.InsertMeteoDataTX(ctx, InsertMeteoDataParams{
FileChecksum: fs.FileChecksum,
Accepted: accepted,
Rejected: rejected,
})
if err != nil {
slog.Error("error inserting meteo data", "err", err)
return err
}
fs.BatchID = result.BatchID
fs.RowsInserted = result.AcceptedCount
fs.RowsRejected = result.RejectedCount
return nil
}
func (s *Service) UpdateElapsedMS(ctx context.Context, batchID, elapsedMS int) error {
if err := s.repo.UpdateBatchElapsedTime(ctx, batchID, elapsedMS); err != nil {
slog.Error("error updating batch elapsed time", "err", err)
}
return nil
}

View File

@ -1,16 +1,32 @@
create extension citext; -- noqa
create table public.ingest_batch
(
id serial primary key,
elapsed_ms int not null default 0,
file_checksum text not null,
created_at timestamp not null default now()
);
create index idx_ingest_batch_file_checksum on public.ingest_batch (
file_checksum
);
create table public.meteo_data
(
id serial primary key,
batch_id int not null references public.ingest_batch (id),
location_name citext not null,
max_temp numeric(5,2) not null,
min_temp numeric(5,2) not null,
rainfall numeric(5,2) not null,
date_of_register date not null,
max_temp numeric(5, 2) not null,
min_temp numeric(5, 2) not null,
rainfall numeric(5, 2) not null,
cloudiness int not null,
created_at date not null default now()
created_at timestamp not null default now()
);
create index idx_meteo_data_location_name on public.meteo_data (location_name);
@ -18,6 +34,8 @@ create index idx_meteo_data_location_name on public.meteo_data (location_name);
create table public.rejected_data
(
id serial primary key,
batch_id int not null references public.ingest_batch (id),
raw_data text not null,
reason text default null,