package meteo import ( "context" "fmt" b "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) type InsertMeteoDataParams struct { FileChecksum string Accepted []MeteoData Rejected []RejectedMeteoData } type InsertMeteoDataResult struct { BatchID int AcceptedCount int RejectedCount int } type Repository interface { InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error } type pgxRepo struct { *pgxpool.Pool } func NewPGXRepo(pool *pgxpool.Pool) Repository { return &pgxRepo{ pool, } } func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) { tx, err := pgx.Begin(ctx) if err != nil { return nil, fmt.Errorf("error starting transaction: %w", err) } defer tx.Rollback(ctx) batchID, err := pgx.insertBatch(ctx, tx, params.FileChecksum) if err != nil { return nil, err } acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, batchID, params.Accepted) if err != nil { return nil, err } rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, batchID, params.Rejected) if err != nil { return nil, err } if err = tx.Commit(ctx); err != nil { return nil, fmt.Errorf("error committing transaction: %w", err) } return &InsertMeteoDataResult{ BatchID: batchID, AcceptedCount: acceptedCount, RejectedCount: rejectedCount, }, nil } const insertBatch = `insert into public.ingest_batch (elapsed_ms, file_checksum) values ($1, $2) returning id` func (pgx *pgxRepo) insertBatch(ctx context.Context, tx b.Tx, fileChecksum string) (int, error) { var batchID int err := tx.QueryRow(ctx, insertBatch, 0, fileChecksum).Scan(&batchID) if err != nil { return 0, fmt.Errorf("error inserting batch: %w", err) } return batchID, nil } const insertAcceptedMeteoData = `insert into public.meteo_data (batch_id, location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness) values ($1, $2, $3, $4, $5, $6, $7) returning id` func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []MeteoData) (int, error) { if len(data) == 0 { return 0, nil } batch := &b.Batch{} for _, d := range data { batch.Queue(insertAcceptedMeteoData, batchID, d.Location, d.Timestamp, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness) } results := tx.SendBatch(ctx, batch) var rowsInserted int for i := range data { _, err := results.Exec() rowsInserted++ if err != nil { results.Close() return 0, fmt.Errorf("error executing batch command %d: %w", i, err) } } results.Close() return rowsInserted, nil } const insertRejectedMeteoData = `insert into public.rejected_data (batch_id, raw_data, reason) values ($1, $2, $3) returning id` func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []RejectedMeteoData) (int, error) { if len(data) == 0 { return 0, nil } batch := &b.Batch{} for _, d := range data { batch.Queue(insertRejectedMeteoData, batchID, d.RowValue, d.Reason) } results := tx.SendBatch(ctx, batch) var rowsInserted int for i := range data { _, err := results.Exec() rowsInserted++ if err != nil { results.Close() return 0, fmt.Errorf("error executing batch command %d: %w", i, err) } } results.Close() return rowsInserted, nil } const updateBatchElapsedTime = `update public.ingest_batch set elapsed_ms = $1 where id = $2` func (pgx *pgxRepo) UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error { _, err := pgx.Exec(ctx, updateBatchElapsedTime, elapsedMS, batchID) if err != nil { return fmt.Errorf("error updating batch elapsed time: %w", err) } return nil }