add sse handle with std lib

This commit is contained in:
Pedro Pérez 2025-07-22 00:11:49 +02:00
parent 62d3553e7a
commit 3175c5c23a

172
broker.go
View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"html/template" "html/template"
"log/slog" "log/slog"
"net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -120,8 +121,98 @@ func (b *SSEBroker) Broadcast(event string, data ...any) {
b.broadcast <- Message{Event: event, Data: payload} b.broadcast <- Message{Event: event, Data: payload}
} }
func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error { // Interface para abstraer las diferencias entre Fiber y HTTP estándar
type SSEWriter interface {
Write(data []byte) (int, error)
Flush() error
}
// Wrapper para http.ResponseWriter
type httpSSEWriter struct {
w http.ResponseWriter
}
func (h *httpSSEWriter) Write(data []byte) (int, error) {
return h.w.Write(data)
}
func (h *httpSSEWriter) Flush() error {
if flusher, ok := h.w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
// Wrapper para bufio.Writer
type fiberSSEWriter struct {
w *bufio.Writer
}
func (f *fiberSSEWriter) Write(data []byte) (int, error) {
return f.w.Write(data)
}
func (f *fiberSSEWriter) Flush() error {
return f.w.Flush()
}
func (b *SSEBroker) handleSSEConnection(clientID string, client *Client, writer SSEWriter) {
slog.Info("SSE connection established", "client_id", clientID)
defer func() {
b.UnregisterClient(clientID)
slog.Info("SSE connection closed", "client_id", clientID)
}()
fmt.Fprintf(writer, "event: client_id\n")
fmt.Fprintf(writer, "data: %s\n\n", clientID)
writer.Flush()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case message := <-client.Send:
var data string
switch v := message.Data.(type) {
case string:
data = v
case template.HTML:
data = string(v)
default:
jsonData, err := json.Marshal(v)
if err != nil {
slog.Error("error marshaling message", "error", err)
continue
}
data = string(jsonData)
}
fmt.Fprintf(writer, "event: %s\n", message.Event)
scanner := bufio.NewScanner(strings.NewReader(data))
for scanner.Scan() {
fmt.Fprintf(writer, "data: %s\n", scanner.Text())
}
fmt.Fprint(writer, "\n")
if err := writer.Flush(); err != nil {
slog.Warn("flush error, closing client", "client_id", clientID)
return
}
case <-ticker.C:
fmt.Fprintf(writer, "event: ping\n")
fmt.Fprintf(writer, "data: %s\n\n", "ping")
writer.Flush()
case <-client.Close:
return
}
}
}
func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream") c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache") c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive") c.Set("Connection", "keep-alive")
@ -131,69 +222,32 @@ func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error {
client := b.RegisterClient(clientID) client := b.RegisterClient(clientID)
c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
slog.Info("SSE connection established", "client_id", clientID) writer := &fiberSSEWriter{w: w}
b.handleSSEConnection(clientID, client, writer)
defer func() {
b.UnregisterClient(clientID)
slog.Info("SSE connection closed", "client_id", clientID)
}()
fmt.Fprintf(w, "event: client_id\n")
fmt.Fprintf(w, "data: %s\n\n", clientID)
w.Flush()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case message := <-client.Send:
var data string
switch v := message.Data.(type) {
case string:
data = v
case template.HTML:
data = string(v)
default:
jsonData, err := json.Marshal(v)
if err != nil {
slog.Error("error marshaling message", "error", err)
continue
}
data = string(jsonData)
}
fmt.Fprintf(w, "event: %s\n", message.Event)
scanner := bufio.NewScanner(strings.NewReader(data))
for scanner.Scan() {
fmt.Fprintf(w, "data: %s\n", scanner.Text())
}
fmt.Fprint(w, "\n")
if err := w.Flush(); err != nil {
slog.Warn("flush error, closing client", "client_id", clientID)
return
}
case <-ticker.C:
fmt.Fprintf(w, "event: ping\n")
fmt.Fprintf(w, "data: %s\n\n", "ping")
case <-client.Close:
return
default:
err := w.Flush()
if err != nil {
slog.Warn("error while flushing", "error", err)
return
}
}
}
})) }))
return nil return nil
} }
func (b *SSEBroker) HandleHTTPSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
clientID := uuid.New().String()
client := b.RegisterClient(clientID)
writer := &httpSSEWriter{w: w}
go func() {
b.handleSSEConnection(clientID, client, writer)
}()
<-r.Context().Done()
slog.Info("client disconnected", "client_id", clientID)
}
func (b *SSEBroker) Shutdown() { func (b *SSEBroker) Shutdown() {
close(b.done) close(b.done)
} }