meteologica/service_a/internal/domains/meteo/repository.go

193 lines
4.8 KiB
Go

package meteo
import (
"context"
"fmt"
b "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error)
UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error
GetMeteoData(ctx context.Context, params GetMeteoData) ([]MeteoData, error)
}
type pgxRepo struct {
*pgxpool.Pool
}
func NewPGXRepo(pool *pgxpool.Pool) Repository {
return &pgxRepo{
pool,
}
}
type InsertMeteoDataParams struct {
FileChecksum string
Accepted []MeteoData
Rejected []RejectedMeteoData
}
type InsertMeteoDataResult struct {
BatchID int
AcceptedCount int
RejectedCount int
}
func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) {
tx, err := pgx.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("error starting transaction: %w", err)
}
defer tx.Rollback(ctx)
batchID, err := pgx.insertBatch(ctx, tx, params.FileChecksum)
if err != nil {
return nil, err
}
acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, batchID, params.Accepted)
if err != nil {
return nil, err
}
rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, batchID, params.Rejected)
if err != nil {
return nil, err
}
if err = tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("error committing transaction: %w", err)
}
return &InsertMeteoDataResult{
BatchID: batchID,
AcceptedCount: acceptedCount,
RejectedCount: rejectedCount,
}, nil
}
const insertBatch = `insert into public.ingest_batch (elapsed_ms, file_checksum) values ($1, $2) returning id`
func (pgx *pgxRepo) insertBatch(ctx context.Context, tx b.Tx, fileChecksum string) (int, error) {
var batchID int
err := tx.QueryRow(ctx, insertBatch, 0, fileChecksum).Scan(&batchID)
if err != nil {
return 0, fmt.Errorf("error inserting batch: %w", err)
}
return batchID, nil
}
const insertAcceptedMeteoData = `insert into public.meteo_data (batch_id, location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness) values ($1, $2, $3, $4, $5, $6, $7) returning id`
func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []MeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertAcceptedMeteoData, batchID, d.Location, d.Timestamp, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness)
}
results := tx.SendBatch(ctx, batch)
var rowsInserted int
for i := range data {
_, err := results.Exec()
rowsInserted++
if err != nil {
results.Close()
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
}
}
results.Close()
return rowsInserted, nil
}
const insertRejectedMeteoData = `insert into public.rejected_data (batch_id, raw_data, reason) values ($1, $2, $3) returning id`
func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []RejectedMeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertRejectedMeteoData, batchID, d.RowValue, d.Reason)
}
results := tx.SendBatch(ctx, batch)
var rowsInserted int
for i := range data {
_, err := results.Exec()
rowsInserted++
if err != nil {
results.Close()
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
}
}
results.Close()
return rowsInserted, nil
}
const updateBatchElapsedTime = `update public.ingest_batch set elapsed_ms = $1 where id = $2`
func (pgx *pgxRepo) UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error {
_, err := pgx.Exec(ctx, updateBatchElapsedTime, elapsedMS, batchID)
if err != nil {
return fmt.Errorf("error updating batch elapsed time: %w", err)
}
return nil
}
const getMeteoDataQuery = `
select location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness
from public.meteo_data
where location_name = $1
and date_of_register >= $2
and date_of_register <= $3
order by date_of_register desc
limit $4 offset $5
`
func (pgx *pgxRepo) GetMeteoData(ctx context.Context, params GetMeteoData) ([]MeteoData, error) {
rows, err := pgx.Query(ctx, getMeteoDataQuery, params.Location, params.From, params.To, params.Limit, params.Offset)
if err != nil {
return nil, fmt.Errorf("error querying meteo data: %w", err)
}
defer rows.Close()
var results []MeteoData
for rows.Next() {
var data MeteoData
err := rows.Scan(
&data.Location,
&data.Timestamp,
&data.MaxTemp,
&data.MinTemp,
&data.Rainfall,
&data.Cloudiness,
)
if err != nil {
return nil, fmt.Errorf("error scanning row: %w", err)
}
results = append(results, data)
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %w", err)
}
return results, nil
}