meteologica/service_a/internal/domains/meteo/repository.go

108 lines
2.4 KiB
Go

package meteo
import (
"context"
"fmt"
b "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error)
}
type pgxRepo struct {
*pgxpool.Pool
}
func NewPGXRepo(pool *pgxpool.Pool) Repository {
return &pgxRepo{
pool,
}
}
const insertAcceptedMeteoData = `insert into public.meteo_data (location_name, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id`
func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, accepted []MeteoData, rejected []RejectedMeteoData) (int, int, error) {
tx, err := pgx.Begin(ctx)
if err != nil {
return 0, 0, fmt.Errorf("error starting transaction: %w", err)
}
defer tx.Rollback(ctx)
acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, accepted)
if err != nil {
return 0, 0, err
}
rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, rejected)
if err != nil {
return 0, 0, err
}
if err = tx.Commit(ctx); err != nil {
return 0, 0, fmt.Errorf("error committing transaction: %w", err)
}
return acceptedCount, rejectedCount, nil
}
func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, data []MeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertAcceptedMeteoData, d.Location, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp)
}
results := tx.SendBatch(ctx, batch)
var rowsInserted int
for i := range data {
_, err := results.Exec()
rowsInserted++
if err != nil {
results.Close()
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
}
}
results.Close()
return rowsInserted, nil
}
const insertRejectedMeteoData = `insert into public.rejected_data (raw_data, reason) values ($1, $2) returning id`
func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, data []RejectedMeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertRejectedMeteoData, d.RowValue, d.Reason)
}
results := tx.SendBatch(ctx, batch)
var rowsInserted int
for i := range data {
_, err := results.Exec()
rowsInserted++
if err != nil {
results.Close()
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
}
}
results.Close()
return rowsInserted, nil
}