fiz: correções da pool

This commit is contained in:
Júnior
2025-06-17 18:26:14 -03:00
parent 682027d517
commit 9259f36e9c
31 changed files with 373 additions and 269 deletions

View File

@ -16,8 +16,8 @@ const (
PhasePrevote = "PREVOTE"
PhasePrecommit = "PRECOMMIT"
rewardAmount = 5
MaxRoundTimeout = 10 * time.Second
rewardAmount = 5
MaxRoundTimeout = 10 * time.Second
)
func StartConsensusLoop(
@ -66,14 +66,13 @@ func StartConsensusLoop(
log.Println("🛑 Loop de consenso encerrado")
return
case <-ticker.C:
roundState.Mu.Lock()
currentRound := roundState.Round
if time.Since(roundState.LastRoundStart) > MaxRoundTimeout {
log.Println("⏰ Timeout! Reiniciando round", roundState.Round+1)
roundState.ResetRound(roundState.Round + 1)
log.Println("⏰ Timeout! Reiniciando round", currentRound+1)
roundState.ResetRound(currentRound + 1)
roundState.LastRoundStart = time.Now()
phase = PhaseProposal
roundState.Mu.Unlock()
continue
}
@ -95,7 +94,7 @@ func StartConsensusLoop(
BaseMsg: BaseMsg{
MsgType: ProposalType,
HeightVal: roundState.Height,
RoundVal: roundState.Round,
RoundVal: currentRound,
Validator: nodeID,
Time: time.Now(),
},
@ -107,15 +106,17 @@ func StartConsensusLoop(
case PhasePrevote:
log.Println("🗳️ Fase de PREVOTE")
proposalHash := roundState.Proposal
vote := PrevoteMsg{
BaseMsg: BaseMsg{
MsgType: PrevoteType,
HeightVal: roundState.Height,
RoundVal: roundState.Round,
RoundVal: currentRound,
Validator: nodeID,
Time: time.Now(),
},
BlockHash: roundState.Proposal,
BlockHash: proposalHash,
}
roundState.Prevotes[nodeID] = vote.BlockHash
broadcast(vote)
@ -123,15 +124,17 @@ func StartConsensusLoop(
case PhasePrecommit:
log.Println("🔐 Fase de PRECOMMIT")
proposalHash := roundState.Proposal
vote := PrecommitMsg{
BaseMsg: BaseMsg{
MsgType: PrecommitType,
HeightVal: roundState.Height,
RoundVal: roundState.Round,
RoundVal: currentRound,
Validator: nodeID,
Time: time.Now(),
},
BlockHash: roundState.Proposal,
BlockHash: proposalHash,
}
roundState.Precommits[nodeID] = vote.BlockHash
broadcast(vote)
@ -164,11 +167,10 @@ func StartConsensusLoop(
}
ApplySlash(roundState.Precommits, blockHash, stakingStore, validatorSet)
}
roundState.ResetRound(roundState.Round + 1)
roundState.ResetRound(currentRound + 1)
roundState.LastRoundStart = time.Now()
phase = PhaseProposal
}
roundState.Mu.Unlock()
}
}
}
}

View File

@ -1,42 +1,38 @@
package consensus
import (
"sync"
"time"
)
// RoundState mantém o estado atual da altura e rodada de consenso.
type RoundState struct {
Height uint64 // Altura atual do consenso (número do bloco)
Round uint64 // Rodada atual (tentativas por altura)
LockedBlock string // Hash do bloco "travado" (caso tenha precommit anterior)
Proposal string // Hash da proposta atual recebida
Prevotes map[string]string // Mapa[ValidatorID] = BlockHash (pode ser vazio)
Precommits map[string]string // Mapa[ValidatorID] = BlockHash
LastRoundStart time.Time // 🆕 Controle de início da rodada
Mu sync.RWMutex // Proteção de acesso concorrente
Height uint64 // Altura atual do consenso (número do bloco)
Round uint64 // Rodada atual (tentativas por altura)
LockedBlock string // Hash do bloco "travado" (caso tenha precommit anterior)
Proposal string // Hash da proposta atual recebida
Prevotes map[string]string // Mapa[ValidatorID] = BlockHash (pode ser vazio)
Precommits map[string]string // Mapa[ValidatorID] = BlockHash
LastRoundStart time.Time // Controle de início da rodada
}
// NewRoundState cria um estado novo para uma altura específica.
func NewRoundState(height uint64) *RoundState {
return &RoundState{
Height: height,
Round: 0,
LockedBlock: "",
Proposal: "",
Prevotes: make(map[string]string),
Precommits: make(map[string]string),
LastRoundStart: time.Now(),
Height: height,
Round: 0,
LockedBlock: "",
Proposal: "",
Prevotes: make(map[string]string),
Precommits: make(map[string]string),
LastRoundStart: time.Now(),
}
}
// ResetRound limpa os votos e proposta da rodada atual (usado ao iniciar nova rodada).
func (rs *RoundState) ResetRound(round uint64) {
rs.Mu.Lock()
defer rs.Mu.Unlock()
rs.Round = round
rs.Proposal = ""
rs.Prevotes = make(map[string]string)
rs.Precommits = make(map[string]string)
rs.LastRoundStart = time.Now()
}
}

View File

@ -13,8 +13,9 @@ import (
)
type HTTPTransport struct {
peers []string
handlers map[MessageType]func(ConsensusMessage)
peers []string
handlers map[MessageType]func(ConsensusMessage)
receiveChan chan ConsensusMessage // Canal para enviar mensagens recebidas
}
func NewHTTPTransport() *HTTPTransport {
@ -67,32 +68,43 @@ func (t *HTTPTransport) HandleIncoming(w http.ResponseWriter, r *http.Request) {
return
}
var msg ConsensusMessage
switch base.MsgType {
case ProposalType:
var msg ProposalMsg
_ = json.Unmarshal(bodyBytes, &msg)
if h, ok := t.handlers[ProposalType]; ok {
h(msg)
}
msg = &ProposalMsg{}
case PrevoteType:
var msg PrevoteMsg
_ = json.Unmarshal(bodyBytes, &msg)
if h, ok := t.handlers[PrevoteType]; ok {
h(msg)
}
msg = &PrevoteMsg{}
case PrecommitType:
var msg PrecommitMsg
_ = json.Unmarshal(bodyBytes, &msg)
if h, ok := t.handlers[PrecommitType]; ok {
h(msg)
}
msg = &PrecommitMsg{}
default:
log.Println("⚠️ Tipo de mensagem desconhecido:", base.MsgType)
w.WriteHeader(http.StatusBadRequest)
return
}
if err := json.Unmarshal(bodyBytes, msg); err != nil {
log.Println("❌ Erro ao decodificar mensagem:", err)
w.WriteHeader(http.StatusBadRequest)
return
}
select {
case t.receiveChan <- msg:
w.WriteHeader(http.StatusOK)
log.Printf("✅ Mensagem %s recebida de %s\n", base.MsgType, base.Validator)
default:
log.Println("⚠️ Canal de mensagens cheio - descartando mensagem")
w.WriteHeader(http.StatusServiceUnavailable)
}
w.WriteHeader(http.StatusOK)
}
// PingPeer envia um ping para o peer e espera resposta.
func (t *HTTPTransport) Receive() <-chan ConsensusMessage {
ch := make(chan ConsensusMessage, 100)
t.receiveChan = ch // Armazena o canal para uso nos handlers
return ch
}
func (t *HTTPTransport) PingPeer(peer string) error {
client := http.Client{
Timeout: 2 * time.Second,
@ -106,4 +118,4 @@ func (t *HTTPTransport) PingPeer(peer string) error {
return errors.New("resposta inválida ao ping")
}
return nil
}
}