From 8670442dbc95c02bfb85d6c2b27b15f42263b20e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20P=C3=A9rez?= Date: Tue, 22 Jul 2025 00:04:52 +0200 Subject: [PATCH] update broker --- app.go | 4 +- broker.go | 245 ++++++++++++++++++++++++++++++------------------------ go.mod | 10 +++ go.sum | 23 +++++ pgx.go | 3 + 5 files changed, 175 insertions(+), 110 deletions(-) diff --git a/app.go b/app.go index 98ee73a..ef2c8ee 100644 --- a/app.go +++ b/app.go @@ -111,6 +111,8 @@ func NewApp(config ...Config) *App { CreateTemplates: false, } + slog.Debug("NewApp", "config", cfg) + if len(config) > 0 { cfg = config[0] if cfg.LogLevel == slog.LevelDebug { @@ -380,7 +382,7 @@ func newLogger(level slog.Level) { mw := io.MultiWriter(os.Stdout, f) logger := slog.New(slog.NewTextHandler(mw, &slog.HandlerOptions{ - AddSource: true, + AddSource: os.Getenv("ENV_MODE") == "" || os.Getenv("ENV_MODE") == "development", Level: level, })) diff --git a/broker.go b/broker.go index bfbde45..aba2d4d 100644 --- a/broker.go +++ b/broker.go @@ -6,167 +6,194 @@ import ( "fmt" "html/template" "log/slog" - "net/http" "strings" "sync" "time" + "github.com/gofiber/fiber/v2" "github.com/google/uuid" + "github.com/valyala/fasthttp" ) type Message struct { - Event string - Data any + Event string `json:"event"` + Data any `json:"data"` } type Client struct { - ID string - Send chan Message - Close chan struct{} - IsActive bool + ID string + Send chan Message + Close chan struct{} } type SSEBroker struct { - clients map[string]*Client - mu sync.RWMutex - ticker *time.Ticker - done chan struct{} + clients map[string]*Client + mu sync.RWMutex + register chan *Client + unregister chan *Client + broadcast chan Message + done chan struct{} } func newSSEBroker() *SSEBroker { - s := &SSEBroker{ - clients: make(map[string]*Client), - ticker: time.NewTicker(time.Minute), - done: make(chan struct{}), + b := &SSEBroker{ + clients: make(map[string]*Client), + register: make(chan *Client), + unregister: make(chan *Client), + broadcast: make(chan Message), + done: make(chan struct{}), } - go s.run() - return s + go b.run() + return b } -func (s *SSEBroker) run() { +func (b *SSEBroker) run() { for { select { - case <-s.ticker.C: - s.Broadcast("ticker") - case <-s.done: - s.ticker.Stop() + case client := <-b.register: + b.mu.Lock() + b.clients[client.ID] = client + b.mu.Unlock() + slog.Info("client registered", "client_id", client.ID) + slog.Debug("clients", "clients", b.clients) + + case client := <-b.unregister: + b.mu.Lock() + if _, ok := b.clients[client.ID]; ok { + delete(b.clients, client.ID) + close(client.Send) + close(client.Close) + slog.Info("client unregistered", "client_id", client.ID) + } + b.mu.Unlock() + + case message := <-b.broadcast: + b.mu.RLock() + for _, client := range b.clients { + select { + case client.Send <- message: + default: + // Si el canal está lleno, se desconecta el cliente + close(client.Send) + close(client.Close) + b.mu.RUnlock() + b.mu.Lock() + delete(b.clients, client.ID) + b.mu.Unlock() + b.mu.RLock() + slog.Warn("client removed due to full channel", "client_id", client.ID) + } + } + b.mu.RUnlock() + + case <-b.done: return } } } -func (s *SSEBroker) registerClient(id string) *Client { - s.mu.Lock() - defer s.mu.Unlock() - +func (b *SSEBroker) RegisterClient(clientID string) *Client { client := &Client{ - ID: id, - Send: make(chan Message, 100), - Close: make(chan struct{}), - IsActive: true, + ID: clientID, + Send: make(chan Message, 10), + Close: make(chan struct{}), } - s.clients[id] = client + b.register <- client return client } -func (s *SSEBroker) unregisterClient(id string) { - s.mu.Lock() - defer s.mu.Unlock() - - if client, exists := s.clients[id]; exists { - close(client.Close) - delete(s.clients, id) +func (b *SSEBroker) UnregisterClient(clientID string) { + b.mu.RLock() + if client, ok := b.clients[clientID]; ok { + b.mu.RUnlock() + b.unregister <- client + } else { + b.mu.RUnlock() } } -func (s *SSEBroker) Broadcast(event string, data ...any) { - s.mu.RLock() - defer s.mu.RUnlock() - - var dataValue any +func (b *SSEBroker) Broadcast(event string, data ...any) { + var payload any if len(data) > 0 { - dataValue = data[0] - } - - message := Message{ - Event: event, - Data: dataValue, - } - - for _, client := range s.clients { - if client.IsActive { - select { - case client.Send <- message: - default: - slog.Warn("client channel full, skipping message", "client_id", client.ID) - } - } + payload = data[0] } + b.broadcast <- Message{Event: event, Data: payload} } -func (s *SSEBroker) HandleSSE(w http.ResponseWriter, r *http.Request) { - slog.Debug("SSE called") +func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error { - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Transfer-Encoding", "chunked") + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") clientID := uuid.New().String() - client := s.registerClient(clientID) - defer s.unregisterClient(clientID) + client := b.RegisterClient(clientID) - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) - return - } + c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + slog.Info("SSE connection established", "client_id", clientID) - slog.Info("SSE connection established", "client_id", clientID) + defer func() { + b.UnregisterClient(clientID) + slog.Info("SSE connection closed", "client_id", clientID) + }() - fmt.Fprintf(w, "event: connection\ndata: %s\n\n", clientID) - flusher.Flush() + fmt.Fprintf(w, "event: client_id\n") + fmt.Fprintf(w, "data: %s\n\n", clientID) + w.Flush() - clientGone := r.Context().Done() + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() - for { - select { - case message := <-client.Send: - slog.Info("message", "message", message) - var data string - switch v := message.Data.(type) { - case string: - data = v - case template.HTML: - data = string(v) - default: - jsonData, err := json.Marshal(v) - if err != nil { - slog.Error("error marshaling message", "error", err) - continue + for { + select { + case message := <-client.Send: + var data string + switch v := message.Data.(type) { + case string: + data = v + case template.HTML: + data = string(v) + default: + jsonData, err := json.Marshal(v) + if err != nil { + slog.Error("error marshaling message", "error", err) + continue + } + data = string(jsonData) } - data = string(jsonData) - } - fmt.Fprintf(w, "event: %s\n", message.Event) - scanner := bufio.NewScanner(strings.NewReader(data)) - for scanner.Scan() { - fmt.Fprintf(w, "data: %s\n", scanner.Text()) - } + fmt.Fprintf(w, "event: %s\n", message.Event) + scanner := bufio.NewScanner(strings.NewReader(data)) + for scanner.Scan() { + fmt.Fprintf(w, "data: %s\n", scanner.Text()) + } + fmt.Fprint(w, "\n") - fmt.Fprint(w, "\n") - case <-client.Close: - slog.Info("client closed", "client_id", clientID) - return - case <-clientGone: - slog.Info("client gone", "client_id", clientID) - return + if err := w.Flush(); err != nil { + slog.Warn("flush error, closing client", "client_id", clientID) + return + } + case <-ticker.C: + fmt.Fprintf(w, "event: ping\n") + fmt.Fprintf(w, "data: %s\n\n", "ping") + + case <-client.Close: + return + + default: + err := w.Flush() + if err != nil { + slog.Warn("error while flushing", "error", err) + return + } + } } + })) - flusher.Flush() - } + return nil } -func (s *SSEBroker) Shutdown() { - close(s.done) +func (b *SSEBroker) Shutdown() { + close(b.done) } diff --git a/go.mod b/go.mod index 866d6e7..8a97959 100644 --- a/go.mod +++ b/go.mod @@ -5,21 +5,31 @@ go 1.24 require ( github.com/alexedwards/scs/v2 v2.8.0 github.com/go-sql-driver/mysql v1.9.2 + github.com/gofiber/fiber/v2 v2.52.9 github.com/golang-migrate/migrate/v4 v4.18.3 github.com/google/uuid v1.6.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.7.4 github.com/stretchr/testify v1.10.0 + github.com/valyala/fasthttp v1.51.0 ) require ( aidanwoods.dev/go-result v0.3.1 // indirect filippo.io/edwards25519 v1.1.0 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/lib/pq v1.10.9 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 5c6d05b..88f5842 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/alexedwards/scs/v2 v2.8.0 h1:h31yUYoycPuL0zt14c0gd+oqxfRwIj6SOjHdKRZxhEw= github.com/alexedwards/scs/v2 v2.8.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -31,6 +33,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sql-driver/mysql v1.9.2 h1:4cNKDYQ1I84SXslGddlsrMhc8k4LeDVj6Ad6WRjiHuU= github.com/go-sql-driver/mysql v1.9.2/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= +github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= +github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-migrate/migrate/v4 v4.18.3 h1:EYGkoOsvgHHfm5U/naS1RP/6PL/Xv3S4B/swMiAmDLs= @@ -61,12 +65,21 @@ github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg= github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= @@ -81,6 +94,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -89,6 +104,12 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= +github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= @@ -103,6 +124,8 @@ golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= diff --git a/pgx.go b/pgx.go index 6682003..a4c0dcb 100644 --- a/pgx.go +++ b/pgx.go @@ -21,6 +21,9 @@ var ( ) func (a *App) newPGXPool(name string) *pgxpool.Pool { + + slog.Debug("newPGXPool", "name", name, "datasource", a.Datasource(name)) + pgxMutex.Lock() defer pgxMutex.Unlock()