Files
dejo-node/internal/consensus/transport_http.go
2025-05-23 10:44:32 -03:00

109 lines
2.4 KiB
Go

package consensus
import (
"bytes"
"encoding/json"
"errors"
"io"
"log"
"net/http"
"os"
"strings"
"time"
)
type HTTPTransport struct {
peers []string
handlers map[MessageType]func(ConsensusMessage)
}
func NewHTTPTransport() *HTTPTransport {
peersEnv := os.Getenv("DEJO_PEERS")
peers := []string{}
if peersEnv != "" {
peers = strings.Split(peersEnv, ",")
}
return &HTTPTransport{
peers: peers,
handlers: make(map[MessageType]func(ConsensusMessage)),
}
}
func (t *HTTPTransport) Broadcast(msg ConsensusMessage) {
data, err := json.Marshal(msg)
if err != nil {
log.Println("❌ Erro ao serializar mensagem para broadcast:", err)
return
}
for _, peer := range t.peers {
go func(peer string) {
resp, err := http.Post(peer+"/consensus", "application/json", bytes.NewReader(data))
if err != nil {
log.Println("⚠️ Erro ao enviar mensagem para", peer, "erro:", err)
return
}
resp.Body.Close()
}(peer)
}
}
func (t *HTTPTransport) Register(msgType MessageType, handler func(ConsensusMessage)) {
t.handlers[msgType] = handler
}
func (t *HTTPTransport) HandleIncoming(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
log.Println("❌ Erro ao ler corpo da mensagem:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
var base BaseMsg
if err := json.Unmarshal(bodyBytes, &base); err != nil {
log.Println("❌ Erro ao decodificar base da mensagem:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
switch base.MsgType {
case ProposalType:
var msg ProposalMsg
_ = json.Unmarshal(bodyBytes, &msg)
if h, ok := t.handlers[ProposalType]; ok {
h(msg)
}
case PrevoteType:
var msg PrevoteMsg
_ = json.Unmarshal(bodyBytes, &msg)
if h, ok := t.handlers[PrevoteType]; ok {
h(msg)
}
case PrecommitType:
var msg PrecommitMsg
_ = json.Unmarshal(bodyBytes, &msg)
if h, ok := t.handlers[PrecommitType]; ok {
h(msg)
}
default:
log.Println("⚠️ Tipo de mensagem desconhecido:", base.MsgType)
}
w.WriteHeader(http.StatusOK)
}
// PingPeer envia um ping para o peer e espera resposta.
func (t *HTTPTransport) PingPeer(peer string) error {
client := http.Client{
Timeout: 2 * time.Second,
}
resp, err := client.Get(peer + "/ping")
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("resposta inválida ao ping")
}
return nil
}