package goblocks import ( "bufio" "encoding/json" "fmt" "html/template" "log/slog" "net/http" "strings" "sync" "time" "github.com/gofiber/fiber/v2" "github.com/google/uuid" "github.com/valyala/fasthttp" ) type Message struct { Event string `json:"event"` Data any `json:"data"` } type Client struct { ID string Send chan Message Close chan struct{} } type SSEBroker struct { clients map[string]*Client mu sync.RWMutex register chan *Client unregister chan *Client broadcast chan Message done chan struct{} } func newSSEBroker() *SSEBroker { b := &SSEBroker{ clients: make(map[string]*Client), register: make(chan *Client), unregister: make(chan *Client), broadcast: make(chan Message), done: make(chan struct{}), } go b.run() return b } func (b *SSEBroker) run() { for { select { case client := <-b.register: b.mu.Lock() b.clients[client.ID] = client b.mu.Unlock() slog.Info("client registered", "client_id", client.ID) slog.Debug("clients", "clients", b.clients) case client := <-b.unregister: b.mu.Lock() if _, ok := b.clients[client.ID]; ok { delete(b.clients, client.ID) close(client.Send) close(client.Close) slog.Info("client unregistered", "client_id", client.ID) } b.mu.Unlock() case message := <-b.broadcast: b.mu.RLock() for _, client := range b.clients { select { case client.Send <- message: default: // Si el canal está lleno, se desconecta el cliente close(client.Send) close(client.Close) b.mu.RUnlock() b.mu.Lock() delete(b.clients, client.ID) b.mu.Unlock() b.mu.RLock() slog.Warn("client removed due to full channel", "client_id", client.ID) } } b.mu.RUnlock() case <-b.done: return } } } func (b *SSEBroker) RegisterClient(clientID string) *Client { client := &Client{ ID: clientID, Send: make(chan Message, 10), Close: make(chan struct{}), } b.register <- client return client } func (b *SSEBroker) UnregisterClient(clientID string) { b.mu.RLock() if client, ok := b.clients[clientID]; ok { b.mu.RUnlock() b.unregister <- client } else { b.mu.RUnlock() } } func (b *SSEBroker) Broadcast(event string, data ...any) { var payload any if len(data) > 0 { payload = data[0] } b.broadcast <- Message{Event: event, Data: payload} } // Interface para abstraer las diferencias entre Fiber y HTTP estándar type SSEWriter interface { Write(data []byte) (int, error) Flush() error } // Wrapper para http.ResponseWriter type httpSSEWriter struct { w http.ResponseWriter } func (h *httpSSEWriter) Write(data []byte) (int, error) { return h.w.Write(data) } func (h *httpSSEWriter) Flush() error { if flusher, ok := h.w.(http.Flusher); ok { flusher.Flush() } return nil } // Wrapper para bufio.Writer type fiberSSEWriter struct { w *bufio.Writer } func (f *fiberSSEWriter) Write(data []byte) (int, error) { return f.w.Write(data) } func (f *fiberSSEWriter) Flush() error { return f.w.Flush() } func (b *SSEBroker) handleSSEConnection(clientID string, client *Client, writer SSEWriter) { slog.Info("SSE connection established", "client_id", clientID) defer func() { b.UnregisterClient(clientID) slog.Info("SSE connection closed", "client_id", clientID) }() fmt.Fprintf(writer, "event: client_id\n") fmt.Fprintf(writer, "data: %s\n\n", clientID) writer.Flush() ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case message := <-client.Send: var data string switch v := message.Data.(type) { case string: data = v case template.HTML: data = string(v) default: jsonData, err := json.Marshal(v) if err != nil { slog.Error("error marshaling message", "error", err) continue } data = string(jsonData) } fmt.Fprintf(writer, "event: %s\n", message.Event) scanner := bufio.NewScanner(strings.NewReader(data)) for scanner.Scan() { fmt.Fprintf(writer, "data: %s\n", scanner.Text()) } fmt.Fprint(writer, "\n") if err := writer.Flush(); err != nil { slog.Warn("flush error, closing client", "client_id", clientID) return } case <-ticker.C: fmt.Fprintf(writer, "event: ping\n") fmt.Fprintf(writer, "data: %s\n\n", "ping") writer.Flush() case <-client.Close: return } } } func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") c.Set("Transfer-Encoding", "chunked") clientID := uuid.New().String() client := b.RegisterClient(clientID) c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { writer := &fiberSSEWriter{w: w} b.handleSSEConnection(clientID, client, writer) })) return nil } func (b *SSEBroker) HandleHTTPSSE(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Transfer-Encoding", "chunked") clientID := uuid.New().String() client := b.RegisterClient(clientID) writer := &httpSSEWriter{w: w} go func() { b.handleSSEConnection(clientID, client, writer) }() <-r.Context().Done() slog.Info("client disconnected", "client_id", clientID) } func (b *SSEBroker) Shutdown() { close(b.done) }