122 lines
2.9 KiB
Go
122 lines
2.9 KiB
Go
package consensus
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type HTTPTransport struct {
|
|
peers []string
|
|
handlers map[MessageType]func(ConsensusMessage)
|
|
receiveChan chan ConsensusMessage // Canal para enviar mensagens recebidas
|
|
}
|
|
|
|
func NewHTTPTransport() *HTTPTransport {
|
|
peersEnv := os.Getenv("DEJO_PEERS")
|
|
peers := []string{}
|
|
if peersEnv != "" {
|
|
peers = strings.Split(peersEnv, ",")
|
|
}
|
|
return &HTTPTransport{
|
|
peers: peers,
|
|
handlers: make(map[MessageType]func(ConsensusMessage)),
|
|
}
|
|
}
|
|
|
|
func (t *HTTPTransport) Broadcast(msg ConsensusMessage) {
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
log.Println("❌ Erro ao serializar mensagem para broadcast:", err)
|
|
return
|
|
}
|
|
for _, peer := range t.peers {
|
|
go func(peer string) {
|
|
resp, err := http.Post(peer+"/consensus", "application/json", bytes.NewReader(data))
|
|
if err != nil {
|
|
log.Println("⚠️ Erro ao enviar mensagem para", peer, "erro:", err)
|
|
return
|
|
}
|
|
resp.Body.Close()
|
|
}(peer)
|
|
}
|
|
}
|
|
|
|
func (t *HTTPTransport) Register(msgType MessageType, handler func(ConsensusMessage)) {
|
|
t.handlers[msgType] = handler
|
|
}
|
|
|
|
func (t *HTTPTransport) HandleIncoming(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
bodyBytes, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
log.Println("❌ Erro ao ler corpo da mensagem:", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var base BaseMsg
|
|
if err := json.Unmarshal(bodyBytes, &base); err != nil {
|
|
log.Println("❌ Erro ao decodificar base da mensagem:", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var msg ConsensusMessage
|
|
switch base.MsgType {
|
|
case ProposalType:
|
|
msg = &ProposalMsg{}
|
|
case PrevoteType:
|
|
msg = &PrevoteMsg{}
|
|
case PrecommitType:
|
|
msg = &PrecommitMsg{}
|
|
default:
|
|
log.Println("⚠️ Tipo de mensagem desconhecido:", base.MsgType)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if err := json.Unmarshal(bodyBytes, msg); err != nil {
|
|
log.Println("❌ Erro ao decodificar mensagem:", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case t.receiveChan <- msg:
|
|
w.WriteHeader(http.StatusOK)
|
|
log.Printf("✅ Mensagem %s recebida de %s\n", base.MsgType, base.Validator)
|
|
default:
|
|
log.Println("⚠️ Canal de mensagens cheio - descartando mensagem")
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
}
|
|
}
|
|
|
|
// PingPeer envia um ping para o peer e espera resposta.
|
|
func (t *HTTPTransport) Receive() <-chan ConsensusMessage {
|
|
ch := make(chan ConsensusMessage, 100)
|
|
t.receiveChan = ch // Armazena o canal para uso nos handlers
|
|
return ch
|
|
}
|
|
|
|
func (t *HTTPTransport) PingPeer(peer string) error {
|
|
client := http.Client{
|
|
Timeout: 2 * time.Second,
|
|
}
|
|
resp, err := client.Get(peer + "/ping")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
return errors.New("resposta inválida ao ping")
|
|
}
|
|
return nil
|
|
}
|