diff --git a/service_a/internal/domains/meteo/handlers.go b/service_a/internal/domains/meteo/handlers.go index b71dbf0..66703c9 100644 --- a/service_a/internal/domains/meteo/handlers.go +++ b/service_a/internal/domains/meteo/handlers.go @@ -50,7 +50,7 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) { } start := time.Now() - err = h.Service.IngestCSV(bytes.NewReader(content), fileStats) + err = h.Service.IngestCSV(r.Context(), bytes.NewReader(content), fileStats) if err != nil { slog.Error(ErrCannotParseFile.Error(), "filename", header.Filename, diff --git a/service_a/internal/domains/meteo/repository.go b/service_a/internal/domains/meteo/repository.go index 85cb6da..74c2556 100644 --- a/service_a/internal/domains/meteo/repository.go +++ b/service_a/internal/domains/meteo/repository.go @@ -9,8 +9,8 @@ import ( ) type Repository interface { - InsertAcceptedMeteoData(data []MeteoData) (int, error) - InsertRejectedMeteoData(data []RejectedMeteoData) (int, error) + InsertAcceptedMeteoData(ctx context.Context, data []MeteoData) (int, error) + InsertRejectedMeteoData(ctx context.Context, data []RejectedMeteoData) (int, error) } type pgxRepo struct { @@ -25,22 +25,23 @@ func NewPGXRepo(pool *pgxpool.Pool) Repository { const insertAcceptedMeteoData = `insert into public.meteo_data (location_id, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id` -func (pgx *pgxRepo) InsertAcceptedMeteoData(data []MeteoData) (int, error) { - // TODO: pass context +func (pgx *pgxRepo) InsertAcceptedMeteoData(ctx context.Context, data []MeteoData) (int, error) { + // TODO pass context // TODO improve transaction - tx, err := pgx.Begin(context.Background()) + tx, err := pgx.Begin(ctx) if err != nil { return 0, fmt.Errorf("error starting transaction: %w", err) } - defer tx.Rollback(context.Background()) + defer tx.Rollback(ctx) batch := &b.Batch{} for _, d := range data { + // TODO get city id before insert! batch.Queue(insertAcceptedMeteoData, 1, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp) } - results := tx.SendBatch(context.Background(), batch) + results := tx.SendBatch(ctx, batch) for i := range data { _, err := results.Exec() @@ -52,13 +53,13 @@ func (pgx *pgxRepo) InsertAcceptedMeteoData(data []MeteoData) (int, error) { results.Close() - if err = tx.Commit(context.Background()); err != nil { + if err = tx.Commit(ctx); err != nil { return 0, fmt.Errorf("error committing transaction: %w", err) } return 0, nil } -func (pgx *pgxRepo) InsertRejectedMeteoData(data []RejectedMeteoData) (int, error) { +func (pgx *pgxRepo) InsertRejectedMeteoData(ctx context.Context, data []RejectedMeteoData) (int, error) { return 0, nil } diff --git a/service_a/internal/domains/meteo/service.go b/service_a/internal/domains/meteo/service.go index 156e5cd..ee25db1 100644 --- a/service_a/internal/domains/meteo/service.go +++ b/service_a/internal/domains/meteo/service.go @@ -1,6 +1,7 @@ package meteo import ( + "context" "io" "log/slog" ) @@ -17,22 +18,22 @@ func NewService(repo Repository) *Service { } } -func (s *Service) IngestCSV(r io.Reader, fs *FileStats) error { +func (s *Service) IngestCSV(ctx context.Context, r io.Reader, fs *FileStats) error { accepted, rejected, err := s.csv.Parse(r) if err != nil { return err } // TODO call insertToDB - s.insertAcceptedToDB(accepted, fs) - s.insertRejectedToDB(rejected, fs) + s.insertAcceptedToDB(ctx, accepted, fs) + s.insertRejectedToDB(ctx, rejected, fs) return nil } -func (s *Service) insertAcceptedToDB(data []MeteoData, fs *FileStats) error { +func (s *Service) insertAcceptedToDB(ctx context.Context, data []MeteoData, fs *FileStats) error { var err error - fs.RowsInserted, err = s.repo.InsertAcceptedMeteoData(data) + fs.RowsInserted, err = s.repo.InsertAcceptedMeteoData(ctx, data) if err != nil { slog.Error("error", "err", err) return err @@ -41,9 +42,9 @@ func (s *Service) insertAcceptedToDB(data []MeteoData, fs *FileStats) error { return nil } -func (s *Service) insertRejectedToDB(data []RejectedMeteoData, fs *FileStats) error { +func (s *Service) insertRejectedToDB(ctx context.Context, data []RejectedMeteoData, fs *FileStats) error { var err error - fs.RowsRejected, err = s.repo.InsertRejectedMeteoData(data) + fs.RowsRejected, err = s.repo.InsertRejectedMeteoData(ctx, data) if err != nil { return err } diff --git a/service_a/server/main.go b/service_a/server/main.go index ad8fedb..03b35ea 100644 --- a/service_a/server/main.go +++ b/service_a/server/main.go @@ -1,11 +1,13 @@ package main import ( + "fmt" "log/slog" "net/http" "servicea/internal/app" "servicea/internal/domains/meteo" "servicea/internal/router" + "time" ) func main() { @@ -18,6 +20,17 @@ func main() { meteoHandler := meteo.NewHandler(meteoService) meteo.RegisterRoutes(mux, meteoHandler) + server := http.Server{ + Addr: ":8080", + Handler: mux, + ReadTimeout: 15 * time.Second, + WriteTimeout: 15 * time.Second, + IdleTimeout: 60 * time.Second, + ReadHeaderTimeout: 5 * time.Second, + } + slog.Info("server starting on :8080") - http.ListenAndServe(":8080", mux) + if err := server.ListenAndServe(); err != nil { + panic(fmt.Sprintf("server failed, error %s", err)) + } }