From 062c7f3f39047cdd27d57c792c9ec5d56b57c8bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20P=C3=A9rez?= Date: Thu, 23 Oct 2025 17:38:51 +0200 Subject: [PATCH] merge to one transaction for accepted and rejected data --- .../internal/domains/meteo/repository.go | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/service_a/internal/domains/meteo/repository.go b/service_a/internal/domains/meteo/repository.go index 28be5b3..30ca398 100644 --- a/service_a/internal/domains/meteo/repository.go +++ b/service_a/internal/domains/meteo/repository.go @@ -9,8 +9,7 @@ import ( ) type Repository interface { - InsertAcceptedMeteoData(ctx context.Context, data []MeteoData) (int, error) - InsertRejectedMeteoData(ctx context.Context, data []RejectedMeteoData) (int, error) + InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error) } type pgxRepo struct { @@ -25,13 +24,35 @@ func NewPGXRepo(pool *pgxpool.Pool) Repository { const insertAcceptedMeteoData = `insert into public.meteo_data (location_name, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id` -func (pgx *pgxRepo) InsertAcceptedMeteoData(ctx context.Context, data []MeteoData) (int, error) { +func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error) { tx, err := pgx.Begin(ctx) if err != nil { - return 0, fmt.Errorf("error starting transaction: %w", err) + return 0, 0, fmt.Errorf("error starting transaction: %w", err) } defer tx.Rollback(ctx) + acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, accepted) + if err != nil { + return 0, 0, err + } + + rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, rejected) + if err != nil { + return 0, 0, err + } + + if err = tx.Commit(ctx); err != nil { + return 0, 0, fmt.Errorf("error committing transaction: %w", err) + } + + return acceptedCount, rejectedCount, nil +} + +func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data []MeteoData) (int, error) { + if len(data) == 0 { + return 0, nil + } + batch := &b.Batch{} for _, d := range data { @@ -40,8 +61,10 @@ func (pgx *pgxRepo) InsertAcceptedMeteoData(ctx context.Context, data []MeteoDat results := tx.SendBatch(ctx, batch) + var rowsInserted int for i := range data { _, err := results.Exec() + rowsInserted++ if err != nil { results.Close() return 0, fmt.Errorf("error executing batch command %d: %w", i, err) @@ -50,13 +73,35 @@ func (pgx *pgxRepo) InsertAcceptedMeteoData(ctx context.Context, data []MeteoDat results.Close() - if err = tx.Commit(ctx); err != nil { - return 0, fmt.Errorf("error committing transaction: %w", err) + return rowsInserted, nil +} + +const insertRejectedMeteoData = `insert into public.rejected_data (raw_data, reason) values ($1, $2) returning id` + +func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data []RejectedMeteoData) (int, error) { + if len(data) == 0 { + return 0, nil } - return 0, nil -} + batch := &b.Batch{} -func (pgx *pgxRepo) InsertRejectedMeteoData(ctx context.Context, data []RejectedMeteoData) (int, error) { - return 0, nil + for _, d := range data { + batch.Queue(insertRejectedMeteoData, d.RowValue, d.Reason) + } + + results := tx.SendBatch(ctx, batch) + + var rowsInserted int + for i := range data { + _, err := results.Exec() + rowsInserted++ + if err != nil { + results.Close() + return 0, fmt.Errorf("error executing batch command %d: %w", i, err) + } + } + + results.Close() + + return rowsInserted, nil }