meteologica/service_a/internal/domains/meteo/repository.go

224 lines
5.5 KiB
Go

package meteo
import (
"context"
"fmt"
"servicea/internal/domains"
"strings"
b "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error)
UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error
GetCities(ctx context.Context) []string
GetMeteoData(ctx context.Context, params GetMeteoData) ([]MeteoData, error)
}
type pgxRepo struct {
*pgxpool.Pool
}
func NewPGXRepo(pool *pgxpool.Pool) Repository {
return &pgxRepo{
pool,
}
}
type InsertMeteoDataParams struct {
FileChecksum string
Accepted []MeteoData
Rejected []RejectedMeteoData
}
type InsertMeteoDataResult struct {
BatchID int
AcceptedCount int
RejectedCount int
}
func (pgx *pgxRepo) InsertMeteoDataTX(ctx context.Context, params InsertMeteoDataParams) (*InsertMeteoDataResult, error) {
tx, err := pgx.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("error starting transaction: %w", err)
}
defer tx.Rollback(ctx)
batchID, err := pgx.insertBatch(ctx, tx, params.FileChecksum)
if err != nil {
return nil, err
}
acceptedCount, err := pgx.insertAcceptedMeteoData(ctx, tx, batchID, params.Accepted)
if err != nil {
return nil, err
}
rejectedCount, err := pgx.insertRejectedMeteoData(ctx, tx, batchID, params.Rejected)
if err != nil {
return nil, err
}
if err = tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("error committing transaction: %w", err)
}
return &InsertMeteoDataResult{
BatchID: batchID,
AcceptedCount: acceptedCount,
RejectedCount: rejectedCount,
}, nil
}
const insertBatch = `insert into public.ingest_batch (elapsed_ms, file_checksum) values ($1, $2) returning id`
func (pgx *pgxRepo) insertBatch(ctx context.Context, tx b.Tx, fileChecksum string) (int, error) {
var batchID int
err := tx.QueryRow(ctx, insertBatch, 0, fileChecksum).Scan(&batchID)
if err != nil {
if strings.Contains(err.Error(), domains.SQLSTATE_23505) {
return 0, ErrRecordAlreadyExists
}
return 0, fmt.Errorf("error inserting batch: %w", err)
}
return batchID, nil
}
const insertAcceptedMeteoData = `insert into public.meteo_data (batch_id, location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness) values ($1, $2, $3, $4, $5, $6, $7) returning id`
func (pgx *pgxRepo) insertAcceptedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []MeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertAcceptedMeteoData, batchID, d.Location, d.Timestamp, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness)
}
results := tx.SendBatch(ctx, batch)
var rowsInserted int
for i := range data {
_, err := results.Exec()
rowsInserted++
if err != nil {
results.Close()
if strings.Contains(err.Error(), domains.SQLSTATE_23505) {
return 0, ErrRecordAlreadyExists
}
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
}
}
results.Close()
return rowsInserted, nil
}
const insertRejectedMeteoData = `insert into public.rejected_data (batch_id, raw_data, reason) values ($1, $2, $3) returning id`
func (pgx *pgxRepo) insertRejectedMeteoData(ctx context.Context, tx b.Tx, batchID int, data []RejectedMeteoData) (int, error) {
if len(data) == 0 {
return 0, nil
}
batch := &b.Batch{}
for _, d := range data {
batch.Queue(insertRejectedMeteoData, batchID, d.RowValue, d.Reason)
}
results := tx.SendBatch(ctx, batch)
var rowsInserted int
for i := range data {
_, err := results.Exec()
rowsInserted++
if err != nil {
results.Close()
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
}
}
results.Close()
return rowsInserted, nil
}
const updateBatchElapsedTime = `update public.ingest_batch set elapsed_ms = $1 where id = $2`
func (pgx *pgxRepo) UpdateBatchElapsedTime(ctx context.Context, batchID int, elapsedMS int) error {
_, err := pgx.Exec(ctx, updateBatchElapsedTime, elapsedMS, batchID)
if err != nil {
return fmt.Errorf("error updating batch elapsed time: %w", err)
}
return nil
}
const getCities = `select distinct location_name from public.meteo_data order by location_name`
func (pgx *pgxRepo) GetCities(ctx context.Context) []string {
rows, err := pgx.Query(ctx, getCities)
if err != nil {
return []string{}
}
defer rows.Close()
var cities []string
for rows.Next() {
var city string
err := rows.Scan(&city)
if err != nil {
continue
}
cities = append(cities, city)
}
return cities
}
const getMeteoData = `
select location_name, date_of_register, max_temp, min_temp, rainfall, cloudiness
from public.meteo_data
where location_name_norm = unaccent(lower($1))
and date_of_register >= $2
and date_of_register <= $3
order by date_of_register desc
limit $4 offset $5
`
func (pgx *pgxRepo) GetMeteoData(ctx context.Context, params GetMeteoData) ([]MeteoData, error) {
rows, err := pgx.Query(ctx, getMeteoData, params.Location, params.From, params.To, params.Limit, params.Offset)
if err != nil {
return nil, fmt.Errorf("error querying meteo data: %w", err)
}
defer rows.Close()
var results []MeteoData
for rows.Next() {
var data MeteoData
err := rows.Scan(
&data.Location,
&data.Timestamp,
&data.MaxTemp,
&data.MinTemp,
&data.Rainfall,
&data.Cloudiness,
)
if err != nil {
return nil, fmt.Errorf("error scanning row: %w", err)
}
results = append(results, data)
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %w", err)
}
return results, nil
}