go-blocks/broker.go

200 lines
4.1 KiB
Go

package goblocks
import (
"bufio"
"encoding/json"
"fmt"
"html/template"
"log/slog"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
type Message struct {
Event string `json:"event"`
Data any `json:"data"`
}
type Client struct {
ID string
Send chan Message
Close chan struct{}
}
type SSEBroker struct {
clients map[string]*Client
mu sync.RWMutex
register chan *Client
unregister chan *Client
broadcast chan Message
done chan struct{}
}
func newSSEBroker() *SSEBroker {
b := &SSEBroker{
clients: make(map[string]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan Message),
done: make(chan struct{}),
}
go b.run()
return b
}
func (b *SSEBroker) run() {
for {
select {
case client := <-b.register:
b.mu.Lock()
b.clients[client.ID] = client
b.mu.Unlock()
slog.Info("client registered", "client_id", client.ID)
slog.Debug("clients", "clients", b.clients)
case client := <-b.unregister:
b.mu.Lock()
if _, ok := b.clients[client.ID]; ok {
delete(b.clients, client.ID)
close(client.Send)
close(client.Close)
slog.Info("client unregistered", "client_id", client.ID)
}
b.mu.Unlock()
case message := <-b.broadcast:
b.mu.RLock()
for _, client := range b.clients {
select {
case client.Send <- message:
default:
// Si el canal está lleno, se desconecta el cliente
close(client.Send)
close(client.Close)
b.mu.RUnlock()
b.mu.Lock()
delete(b.clients, client.ID)
b.mu.Unlock()
b.mu.RLock()
slog.Warn("client removed due to full channel", "client_id", client.ID)
}
}
b.mu.RUnlock()
case <-b.done:
return
}
}
}
func (b *SSEBroker) RegisterClient(clientID string) *Client {
client := &Client{
ID: clientID,
Send: make(chan Message, 10),
Close: make(chan struct{}),
}
b.register <- client
return client
}
func (b *SSEBroker) UnregisterClient(clientID string) {
b.mu.RLock()
if client, ok := b.clients[clientID]; ok {
b.mu.RUnlock()
b.unregister <- client
} else {
b.mu.RUnlock()
}
}
func (b *SSEBroker) Broadcast(event string, data ...any) {
var payload any
if len(data) > 0 {
payload = data[0]
}
b.broadcast <- Message{Event: event, Data: payload}
}
func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
clientID := uuid.New().String()
client := b.RegisterClient(clientID)
c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
slog.Info("SSE connection established", "client_id", clientID)
defer func() {
b.UnregisterClient(clientID)
slog.Info("SSE connection closed", "client_id", clientID)
}()
fmt.Fprintf(w, "event: client_id\n")
fmt.Fprintf(w, "data: %s\n\n", clientID)
w.Flush()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case message := <-client.Send:
var data string
switch v := message.Data.(type) {
case string:
data = v
case template.HTML:
data = string(v)
default:
jsonData, err := json.Marshal(v)
if err != nil {
slog.Error("error marshaling message", "error", err)
continue
}
data = string(jsonData)
}
fmt.Fprintf(w, "event: %s\n", message.Event)
scanner := bufio.NewScanner(strings.NewReader(data))
for scanner.Scan() {
fmt.Fprintf(w, "data: %s\n", scanner.Text())
}
fmt.Fprint(w, "\n")
if err := w.Flush(); err != nil {
slog.Warn("flush error, closing client", "client_id", clientID)
return
}
case <-ticker.C:
fmt.Fprintf(w, "event: ping\n")
fmt.Fprintf(w, "data: %s\n\n", "ping")
case <-client.Close:
return
default:
err := w.Flush()
if err != nil {
slog.Warn("error while flushing", "error", err)
return
}
}
}
}))
return nil
}
func (b *SSEBroker) Shutdown() {
close(b.done)
}