Files
dejo-node/internal/consensus/transport_http.go
2025-06-17 18:26:14 -03:00

122 lines
2.9 KiB
Go

package consensus
import (
"bytes"
"encoding/json"
"errors"
"io"
"log"
"net/http"
"os"
"strings"
"time"
)
type HTTPTransport struct {
peers []string
handlers map[MessageType]func(ConsensusMessage)
receiveChan chan ConsensusMessage // Canal para enviar mensagens recebidas
}
func NewHTTPTransport() *HTTPTransport {
peersEnv := os.Getenv("DEJO_PEERS")
peers := []string{}
if peersEnv != "" {
peers = strings.Split(peersEnv, ",")
}
return &HTTPTransport{
peers: peers,
handlers: make(map[MessageType]func(ConsensusMessage)),
}
}
func (t *HTTPTransport) Broadcast(msg ConsensusMessage) {
data, err := json.Marshal(msg)
if err != nil {
log.Println("❌ Erro ao serializar mensagem para broadcast:", err)
return
}
for _, peer := range t.peers {
go func(peer string) {
resp, err := http.Post(peer+"/consensus", "application/json", bytes.NewReader(data))
if err != nil {
log.Println("⚠️ Erro ao enviar mensagem para", peer, "erro:", err)
return
}
resp.Body.Close()
}(peer)
}
}
func (t *HTTPTransport) Register(msgType MessageType, handler func(ConsensusMessage)) {
t.handlers[msgType] = handler
}
func (t *HTTPTransport) HandleIncoming(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
log.Println("❌ Erro ao ler corpo da mensagem:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
var base BaseMsg
if err := json.Unmarshal(bodyBytes, &base); err != nil {
log.Println("❌ Erro ao decodificar base da mensagem:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
var msg ConsensusMessage
switch base.MsgType {
case ProposalType:
msg = &ProposalMsg{}
case PrevoteType:
msg = &PrevoteMsg{}
case PrecommitType:
msg = &PrecommitMsg{}
default:
log.Println("⚠️ Tipo de mensagem desconhecido:", base.MsgType)
w.WriteHeader(http.StatusBadRequest)
return
}
if err := json.Unmarshal(bodyBytes, msg); err != nil {
log.Println("❌ Erro ao decodificar mensagem:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
select {
case t.receiveChan <- msg:
w.WriteHeader(http.StatusOK)
log.Printf("✅ Mensagem %s recebida de %s\n", base.MsgType, base.Validator)
default:
log.Println("⚠️ Canal de mensagens cheio - descartando mensagem")
w.WriteHeader(http.StatusServiceUnavailable)
}
}
// PingPeer envia um ping para o peer e espera resposta.
func (t *HTTPTransport) Receive() <-chan ConsensusMessage {
ch := make(chan ConsensusMessage, 100)
t.receiveChan = ch // Armazena o canal para uso nos handlers
return ch
}
func (t *HTTPTransport) PingPeer(peer string) error {
client := http.Client{
Timeout: 2 * time.Second,
}
resp, err := client.Get(peer + "/ping")
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("resposta inválida ao ping")
}
return nil
}