package consensus import ( "bytes" "encoding/json" "errors" "io" "log" "net/http" "os" "strings" "time" ) type HTTPTransport struct { peers []string handlers map[MessageType]func(ConsensusMessage) receiveChan chan ConsensusMessage // Canal para enviar mensagens recebidas } func NewHTTPTransport() *HTTPTransport { peersEnv := os.Getenv("DEJO_PEERS") peers := []string{} if peersEnv != "" { peers = strings.Split(peersEnv, ",") } return &HTTPTransport{ peers: peers, handlers: make(map[MessageType]func(ConsensusMessage)), } } func (t *HTTPTransport) Broadcast(msg ConsensusMessage) { data, err := json.Marshal(msg) if err != nil { log.Println("❌ Erro ao serializar mensagem para broadcast:", err) return } for _, peer := range t.peers { go func(peer string) { resp, err := http.Post(peer+"/consensus", "application/json", bytes.NewReader(data)) if err != nil { log.Println("⚠️ Erro ao enviar mensagem para", peer, "erro:", err) return } resp.Body.Close() }(peer) } } func (t *HTTPTransport) Register(msgType MessageType, handler func(ConsensusMessage)) { t.handlers[msgType] = handler } func (t *HTTPTransport) HandleIncoming(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() bodyBytes, err := io.ReadAll(r.Body) if err != nil { log.Println("❌ Erro ao ler corpo da mensagem:", err) w.WriteHeader(http.StatusBadRequest) return } var base BaseMsg if err := json.Unmarshal(bodyBytes, &base); err != nil { log.Println("❌ Erro ao decodificar base da mensagem:", err) w.WriteHeader(http.StatusBadRequest) return } var msg ConsensusMessage switch base.MsgType { case ProposalType: msg = &ProposalMsg{} case PrevoteType: msg = &PrevoteMsg{} case PrecommitType: msg = &PrecommitMsg{} default: log.Println("⚠️ Tipo de mensagem desconhecido:", base.MsgType) w.WriteHeader(http.StatusBadRequest) return } if err := json.Unmarshal(bodyBytes, msg); err != nil { log.Println("❌ Erro ao decodificar mensagem:", err) w.WriteHeader(http.StatusBadRequest) return } select { case t.receiveChan <- msg: w.WriteHeader(http.StatusOK) log.Printf("✅ Mensagem %s recebida de %s\n", base.MsgType, base.Validator) default: log.Println("⚠️ Canal de mensagens cheio - descartando mensagem") w.WriteHeader(http.StatusServiceUnavailable) } } // PingPeer envia um ping para o peer e espera resposta. func (t *HTTPTransport) Receive() <-chan ConsensusMessage { ch := make(chan ConsensusMessage, 100) t.receiveChan = ch // Armazena o canal para uso nos handlers return ch } func (t *HTTPTransport) PingPeer(peer string) error { client := http.Client{ Timeout: 2 * time.Second, } resp, err := client.Get(peer + "/ping") if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errors.New("resposta inválida ao ping") } return nil }