add context propagation
This commit is contained in:
parent
811dfc7507
commit
ab4af962c7
@ -50,7 +50,7 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err = h.Service.IngestCSV(bytes.NewReader(content), fileStats)
|
err = h.Service.IngestCSV(r.Context(), bytes.NewReader(content), fileStats)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error(ErrCannotParseFile.Error(),
|
slog.Error(ErrCannotParseFile.Error(),
|
||||||
"filename", header.Filename,
|
"filename", header.Filename,
|
||||||
|
|||||||
@ -9,8 +9,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Repository interface {
|
type Repository interface {
|
||||||
InsertAcceptedMeteoData(data []MeteoData) (int, error)
|
InsertAcceptedMeteoData(ctx context.Context, data []MeteoData) (int, error)
|
||||||
InsertRejectedMeteoData(data []RejectedMeteoData) (int, error)
|
InsertRejectedMeteoData(ctx context.Context, data []RejectedMeteoData) (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type pgxRepo struct {
|
type pgxRepo struct {
|
||||||
@ -25,22 +25,23 @@ func NewPGXRepo(pool *pgxpool.Pool) Repository {
|
|||||||
|
|
||||||
const insertAcceptedMeteoData = `insert into public.meteo_data (location_id, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id`
|
const insertAcceptedMeteoData = `insert into public.meteo_data (location_id, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id`
|
||||||
|
|
||||||
func (pgx *pgxRepo) InsertAcceptedMeteoData(data []MeteoData) (int, error) {
|
func (pgx *pgxRepo) InsertAcceptedMeteoData(ctx context.Context, data []MeteoData) (int, error) {
|
||||||
// TODO: pass context
|
// TODO pass context
|
||||||
// TODO improve transaction
|
// TODO improve transaction
|
||||||
tx, err := pgx.Begin(context.Background())
|
tx, err := pgx.Begin(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("error starting transaction: %w", err)
|
return 0, fmt.Errorf("error starting transaction: %w", err)
|
||||||
}
|
}
|
||||||
defer tx.Rollback(context.Background())
|
defer tx.Rollback(ctx)
|
||||||
|
|
||||||
batch := &b.Batch{}
|
batch := &b.Batch{}
|
||||||
|
|
||||||
for _, d := range data {
|
for _, d := range data {
|
||||||
|
// TODO get city id before insert!
|
||||||
batch.Queue(insertAcceptedMeteoData, 1, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp)
|
batch.Queue(insertAcceptedMeteoData, 1, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
results := tx.SendBatch(context.Background(), batch)
|
results := tx.SendBatch(ctx, batch)
|
||||||
|
|
||||||
for i := range data {
|
for i := range data {
|
||||||
_, err := results.Exec()
|
_, err := results.Exec()
|
||||||
@ -52,13 +53,13 @@ func (pgx *pgxRepo) InsertAcceptedMeteoData(data []MeteoData) (int, error) {
|
|||||||
|
|
||||||
results.Close()
|
results.Close()
|
||||||
|
|
||||||
if err = tx.Commit(context.Background()); err != nil {
|
if err = tx.Commit(ctx); err != nil {
|
||||||
return 0, fmt.Errorf("error committing transaction: %w", err)
|
return 0, fmt.Errorf("error committing transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pgx *pgxRepo) InsertRejectedMeteoData(data []RejectedMeteoData) (int, error) {
|
func (pgx *pgxRepo) InsertRejectedMeteoData(ctx context.Context, data []RejectedMeteoData) (int, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package meteo
|
package meteo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
)
|
)
|
||||||
@ -17,22 +18,22 @@ func NewService(repo Repository) *Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) IngestCSV(r io.Reader, fs *FileStats) error {
|
func (s *Service) IngestCSV(ctx context.Context, r io.Reader, fs *FileStats) error {
|
||||||
accepted, rejected, err := s.csv.Parse(r)
|
accepted, rejected, err := s.csv.Parse(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO call insertToDB
|
// TODO call insertToDB
|
||||||
s.insertAcceptedToDB(accepted, fs)
|
s.insertAcceptedToDB(ctx, accepted, fs)
|
||||||
s.insertRejectedToDB(rejected, fs)
|
s.insertRejectedToDB(ctx, rejected, fs)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) insertAcceptedToDB(data []MeteoData, fs *FileStats) error {
|
func (s *Service) insertAcceptedToDB(ctx context.Context, data []MeteoData, fs *FileStats) error {
|
||||||
var err error
|
var err error
|
||||||
fs.RowsInserted, err = s.repo.InsertAcceptedMeteoData(data)
|
fs.RowsInserted, err = s.repo.InsertAcceptedMeteoData(ctx, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("error", "err", err)
|
slog.Error("error", "err", err)
|
||||||
return err
|
return err
|
||||||
@ -41,9 +42,9 @@ func (s *Service) insertAcceptedToDB(data []MeteoData, fs *FileStats) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) insertRejectedToDB(data []RejectedMeteoData, fs *FileStats) error {
|
func (s *Service) insertRejectedToDB(ctx context.Context, data []RejectedMeteoData, fs *FileStats) error {
|
||||||
var err error
|
var err error
|
||||||
fs.RowsRejected, err = s.repo.InsertRejectedMeteoData(data)
|
fs.RowsRejected, err = s.repo.InsertRejectedMeteoData(ctx, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,11 +1,13 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"servicea/internal/app"
|
"servicea/internal/app"
|
||||||
"servicea/internal/domains/meteo"
|
"servicea/internal/domains/meteo"
|
||||||
"servicea/internal/router"
|
"servicea/internal/router"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -18,6 +20,17 @@ func main() {
|
|||||||
meteoHandler := meteo.NewHandler(meteoService)
|
meteoHandler := meteo.NewHandler(meteoService)
|
||||||
meteo.RegisterRoutes(mux, meteoHandler)
|
meteo.RegisterRoutes(mux, meteoHandler)
|
||||||
|
|
||||||
slog.Info("server starting on :8080")
|
server := http.Server{
|
||||||
http.ListenAndServe(":8080", mux)
|
Addr: ":8080",
|
||||||
|
Handler: mux,
|
||||||
|
ReadTimeout: 15 * time.Second,
|
||||||
|
WriteTimeout: 15 * time.Second,
|
||||||
|
IdleTimeout: 60 * time.Second,
|
||||||
|
ReadHeaderTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("server starting on :8080")
|
||||||
|
if err := server.ListenAndServe(); err != nil {
|
||||||
|
panic(fmt.Sprintf("server failed, error %s", err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user