From 3175c5c23abcc523cec80c700642b86510c99a62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20P=C3=A9rez?= Date: Tue, 22 Jul 2025 00:11:49 +0200 Subject: [PATCH] add sse handle with std lib --- broker.go | 172 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 113 insertions(+), 59 deletions(-) diff --git a/broker.go b/broker.go index c8593e8..25bf63f 100644 --- a/broker.go +++ b/broker.go @@ -6,6 +6,7 @@ import ( "fmt" "html/template" "log/slog" + "net/http" "strings" "sync" "time" @@ -120,8 +121,98 @@ func (b *SSEBroker) Broadcast(event string, data ...any) { b.broadcast <- Message{Event: event, Data: payload} } -func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error { +// Interface para abstraer las diferencias entre Fiber y HTTP estándar +type SSEWriter interface { + Write(data []byte) (int, error) + Flush() error +} +// Wrapper para http.ResponseWriter +type httpSSEWriter struct { + w http.ResponseWriter +} + +func (h *httpSSEWriter) Write(data []byte) (int, error) { + return h.w.Write(data) +} + +func (h *httpSSEWriter) Flush() error { + if flusher, ok := h.w.(http.Flusher); ok { + flusher.Flush() + } + return nil +} + +// Wrapper para bufio.Writer +type fiberSSEWriter struct { + w *bufio.Writer +} + +func (f *fiberSSEWriter) Write(data []byte) (int, error) { + return f.w.Write(data) +} + +func (f *fiberSSEWriter) Flush() error { + return f.w.Flush() +} + +func (b *SSEBroker) handleSSEConnection(clientID string, client *Client, writer SSEWriter) { + slog.Info("SSE connection established", "client_id", clientID) + + defer func() { + b.UnregisterClient(clientID) + slog.Info("SSE connection closed", "client_id", clientID) + }() + + fmt.Fprintf(writer, "event: client_id\n") + fmt.Fprintf(writer, "data: %s\n\n", clientID) + writer.Flush() + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case message := <-client.Send: + var data string + switch v := message.Data.(type) { + case string: + data = v + case template.HTML: + data = string(v) + default: + jsonData, err := json.Marshal(v) + if err != nil { + slog.Error("error marshaling message", "error", err) + continue + } + data = string(jsonData) + } + + fmt.Fprintf(writer, "event: %s\n", message.Event) + scanner := bufio.NewScanner(strings.NewReader(data)) + for scanner.Scan() { + fmt.Fprintf(writer, "data: %s\n", scanner.Text()) + } + fmt.Fprint(writer, "\n") + + if err := writer.Flush(); err != nil { + slog.Warn("flush error, closing client", "client_id", clientID) + return + } + + case <-ticker.C: + fmt.Fprintf(writer, "event: ping\n") + fmt.Fprintf(writer, "data: %s\n\n", "ping") + writer.Flush() + + case <-client.Close: + return + } + } +} + +func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") @@ -131,69 +222,32 @@ func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error { client := b.RegisterClient(clientID) c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - slog.Info("SSE connection established", "client_id", clientID) - - defer func() { - b.UnregisterClient(clientID) - slog.Info("SSE connection closed", "client_id", clientID) - }() - - fmt.Fprintf(w, "event: client_id\n") - fmt.Fprintf(w, "data: %s\n\n", clientID) - w.Flush() - - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for { - select { - case message := <-client.Send: - var data string - switch v := message.Data.(type) { - case string: - data = v - case template.HTML: - data = string(v) - default: - jsonData, err := json.Marshal(v) - if err != nil { - slog.Error("error marshaling message", "error", err) - continue - } - data = string(jsonData) - } - - fmt.Fprintf(w, "event: %s\n", message.Event) - scanner := bufio.NewScanner(strings.NewReader(data)) - for scanner.Scan() { - fmt.Fprintf(w, "data: %s\n", scanner.Text()) - } - fmt.Fprint(w, "\n") - - if err := w.Flush(); err != nil { - slog.Warn("flush error, closing client", "client_id", clientID) - return - } - case <-ticker.C: - fmt.Fprintf(w, "event: ping\n") - fmt.Fprintf(w, "data: %s\n\n", "ping") - - case <-client.Close: - return - - default: - err := w.Flush() - if err != nil { - slog.Warn("error while flushing", "error", err) - return - } - } - } + writer := &fiberSSEWriter{w: w} + b.handleSSEConnection(clientID, client, writer) })) return nil } +func (b *SSEBroker) HandleHTTPSSE(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Transfer-Encoding", "chunked") + + clientID := uuid.New().String() + client := b.RegisterClient(clientID) + + writer := &httpSSEWriter{w: w} + + go func() { + b.handleSSEConnection(clientID, client, writer) + }() + + <-r.Context().Done() + slog.Info("client disconnected", "client_id", clientID) +} + func (b *SSEBroker) Shutdown() { close(b.done) }