mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #2460 from xiang90/raft-logger
raft: introduce logger interface
This commit is contained in:
commit
a2be25cba4
20
raft/log.go
20
raft/log.go
@ -77,7 +77,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
|
|||||||
switch {
|
switch {
|
||||||
case ci == 0:
|
case ci == 0:
|
||||||
case ci <= l.committed:
|
case ci <= l.committed:
|
||||||
log.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
|
raftLogger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
|
||||||
default:
|
default:
|
||||||
offset := index + 1
|
offset := index + 1
|
||||||
l.append(ents[ci-offset:]...)
|
l.append(ents[ci-offset:]...)
|
||||||
@ -93,7 +93,7 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
|
|||||||
return l.lastIndex()
|
return l.lastIndex()
|
||||||
}
|
}
|
||||||
if after := ents[0].Index - 1; after < l.committed {
|
if after := ents[0].Index - 1; after < l.committed {
|
||||||
log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
|
raftLogger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
|
||||||
}
|
}
|
||||||
l.unstable.truncateAndAppend(ents)
|
l.unstable.truncateAndAppend(ents)
|
||||||
return l.lastIndex()
|
return l.lastIndex()
|
||||||
@ -114,7 +114,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
|
|||||||
for _, ne := range ents {
|
for _, ne := range ents {
|
||||||
if !l.matchTerm(ne.Index, ne.Term) {
|
if !l.matchTerm(ne.Index, ne.Term) {
|
||||||
if ne.Index <= l.lastIndex() {
|
if ne.Index <= l.lastIndex() {
|
||||||
log.Printf("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]",
|
raftLogger.Infof("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]",
|
||||||
ne.Index, l.term(ne.Index), ne.Term)
|
ne.Index, l.term(ne.Index), ne.Term)
|
||||||
}
|
}
|
||||||
return ne.Index
|
return ne.Index
|
||||||
@ -174,7 +174,7 @@ func (l *raftLog) commitTo(tocommit uint64) {
|
|||||||
// never decrease commit
|
// never decrease commit
|
||||||
if l.committed < tocommit {
|
if l.committed < tocommit {
|
||||||
if l.lastIndex() < tocommit {
|
if l.lastIndex() < tocommit {
|
||||||
log.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex())
|
raftLogger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex())
|
||||||
}
|
}
|
||||||
l.committed = tocommit
|
l.committed = tocommit
|
||||||
}
|
}
|
||||||
@ -185,7 +185,7 @@ func (l *raftLog) appliedTo(i uint64) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if l.committed < i || i < l.applied {
|
if l.committed < i || i < l.applied {
|
||||||
log.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
|
raftLogger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
|
||||||
}
|
}
|
||||||
l.applied = i
|
l.applied = i
|
||||||
}
|
}
|
||||||
@ -248,7 +248,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *raftLog) restore(s pb.Snapshot) {
|
func (l *raftLog) restore(s pb.Snapshot) {
|
||||||
log.Printf("raftlog: log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
|
raftLogger.Infof("raftlog: log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
|
||||||
l.committed = s.Metadata.Index
|
l.committed = s.Metadata.Index
|
||||||
l.unstable.restore(s)
|
l.unstable.restore(s)
|
||||||
}
|
}
|
||||||
@ -264,9 +264,9 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
|
|||||||
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset))
|
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset))
|
||||||
if err == ErrCompacted {
|
if err == ErrCompacted {
|
||||||
// This should never fail because it has been checked before.
|
// This should never fail because it has been checked before.
|
||||||
log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
|
raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
|
||||||
} else if err == ErrUnavailable {
|
} else if err == ErrUnavailable {
|
||||||
log.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
|
raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
panic(err) // TODO(bdarnell)
|
panic(err) // TODO(bdarnell)
|
||||||
}
|
}
|
||||||
@ -287,10 +287,10 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
|
|||||||
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
|
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
|
||||||
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) {
|
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) {
|
||||||
if lo > hi {
|
if lo > hi {
|
||||||
log.Panicf("raft: invalid slice %d > %d", lo, hi)
|
raftLogger.Panicf("raft: invalid slice %d > %d", lo, hi)
|
||||||
}
|
}
|
||||||
length := l.lastIndex() - l.firstIndex() + 1
|
length := l.lastIndex() - l.firstIndex() + 1
|
||||||
if lo < l.firstIndex() || hi > l.firstIndex()+length {
|
if lo < l.firstIndex() || hi > l.firstIndex()+length {
|
||||||
log.Panicf("raft: slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex())
|
raftLogger.Panicf("raft: slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,11 +14,7 @@
|
|||||||
|
|
||||||
package raft
|
package raft
|
||||||
|
|
||||||
import (
|
import pb "github.com/coreos/etcd/raft/raftpb"
|
||||||
"log"
|
|
||||||
|
|
||||||
pb "github.com/coreos/etcd/raft/raftpb"
|
|
||||||
)
|
|
||||||
|
|
||||||
// unstable.entris[i] has raft log position i+unstable.offset.
|
// unstable.entris[i] has raft log position i+unstable.offset.
|
||||||
// Note that unstable.offset may be less than the highest log
|
// Note that unstable.offset may be less than the highest log
|
||||||
@ -110,7 +106,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
|
|||||||
// directly append
|
// directly append
|
||||||
u.entries = append(u.entries, ents...)
|
u.entries = append(u.entries, ents...)
|
||||||
case after < u.offset:
|
case after < u.offset:
|
||||||
log.Printf("raftlog: replace the unstable entries from index %d", after+1)
|
raftLogger.Infof("raftlog: replace the unstable entries from index %d", after+1)
|
||||||
// The log is being truncated to before our current offset
|
// The log is being truncated to before our current offset
|
||||||
// portion, so set the offset and replace the entries
|
// portion, so set the offset and replace the entries
|
||||||
u.offset = after + 1
|
u.offset = after + 1
|
||||||
@ -118,7 +114,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
|
|||||||
default:
|
default:
|
||||||
// truncate to after and copy to u.entries
|
// truncate to after and copy to u.entries
|
||||||
// then append
|
// then append
|
||||||
log.Printf("raftlog: truncate the unstable entries to index %d", after)
|
raftLogger.Infof("raftlog: truncate the unstable entries to index %d", after)
|
||||||
u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
|
u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
|
||||||
u.entries = append(u.entries, ents...)
|
u.entries = append(u.entries, ents...)
|
||||||
}
|
}
|
||||||
@ -132,10 +128,10 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
|
|||||||
// u.offset <= lo <= hi <= u.offset+len(u.offset)
|
// u.offset <= lo <= hi <= u.offset+len(u.offset)
|
||||||
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
|
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
|
||||||
if lo > hi {
|
if lo > hi {
|
||||||
log.Panicf("raft: invalid unstable.slice %d > %d", lo, hi)
|
raftLogger.Panicf("raft: invalid unstable.slice %d > %d", lo, hi)
|
||||||
}
|
}
|
||||||
upper := u.offset + uint64(len(u.entries))
|
upper := u.offset + uint64(len(u.entries))
|
||||||
if lo < u.offset || hi > upper {
|
if lo < u.offset || hi > upper {
|
||||||
log.Panicf("raft: unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
|
raftLogger.Panicf("raft: unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
126
raft/logger.go
Normal file
126
raft/logger.go
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package raft
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Logger interface {
|
||||||
|
Debug(v ...interface{})
|
||||||
|
Debugf(format string, v ...interface{})
|
||||||
|
|
||||||
|
Error(v ...interface{})
|
||||||
|
Errorf(format string, v ...interface{})
|
||||||
|
|
||||||
|
Info(v ...interface{})
|
||||||
|
Infof(format string, v ...interface{})
|
||||||
|
|
||||||
|
Warning(v ...interface{})
|
||||||
|
Warningf(format string, v ...interface{})
|
||||||
|
|
||||||
|
Fatal(v ...interface{})
|
||||||
|
Fatalf(format string, v ...interface{})
|
||||||
|
|
||||||
|
Panic(v ...interface{})
|
||||||
|
Panicf(format string, v ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetLogger(l Logger) { raftLogger = l }
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultLogger = &DefaultLogger{Logger: log.New(os.Stderr, "", 0)}
|
||||||
|
discardLogger = &DefaultLogger{Logger: log.New(ioutil.Discard, "", 0)}
|
||||||
|
raftLogger = Logger(defaultLogger)
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
calldepth = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
// DefaultLogger is a defualt implementation of the Logger interface.
|
||||||
|
type DefaultLogger struct {
|
||||||
|
*log.Logger
|
||||||
|
debug bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) EnableTimestamps() {
|
||||||
|
l.SetFlags(l.Flags() | log.Ldate | log.Ltime)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) EnableDebug() {
|
||||||
|
l.debug = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Debug(v ...interface{}) {
|
||||||
|
if l.debug {
|
||||||
|
l.Output(calldepth, header("DEBUG", fmt.Sprint(v...)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Debugf(format string, v ...interface{}) {
|
||||||
|
if l.debug {
|
||||||
|
l.Output(calldepth, header("DEBUG", fmt.Sprintf(format, v...)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Info(v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("INFO", fmt.Sprint(v...)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Infof(format string, v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("INFO", fmt.Sprintf(format, v...)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Error(v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("ERROR", fmt.Sprint(v...)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Errorf(format string, v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("ERROR", fmt.Sprintf(format, v...)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Warning(v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("WARN", fmt.Sprint(v...)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Warningf(format string, v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("WARN", fmt.Sprintf(format, v...)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Fatal(v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("FATAL", fmt.Sprint(v...)))
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Fatalf(format string, v ...interface{}) {
|
||||||
|
l.Output(calldepth, header("FATAL", fmt.Sprintf(format, v...)))
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Panic(v ...interface{}) {
|
||||||
|
l.Logger.Panic(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Panicf(format string, v ...interface{}) {
|
||||||
|
l.Logger.Panicf(format, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func header(lvl, msg string) string {
|
||||||
|
return fmt.Sprintf("%s: %s", lvl, msg)
|
||||||
|
}
|
@ -16,7 +16,6 @@ package raft
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
pb "github.com/coreos/etcd/raft/raftpb"
|
pb "github.com/coreos/etcd/raft/raftpb"
|
||||||
@ -263,13 +262,13 @@ func (n *node) run(r *raft) {
|
|||||||
if lead != r.lead {
|
if lead != r.lead {
|
||||||
if r.hasLeader() {
|
if r.hasLeader() {
|
||||||
if lead == None {
|
if lead == None {
|
||||||
log.Printf("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
|
raftLogger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
|
raftLogger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
|
||||||
}
|
}
|
||||||
propc = n.propc
|
propc = n.propc
|
||||||
} else {
|
} else {
|
||||||
log.Printf("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
|
raftLogger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
|
||||||
propc = nil
|
propc = nil
|
||||||
}
|
}
|
||||||
lead = r.lead
|
lead = r.lead
|
||||||
|
71
raft/raft.go
71
raft/raft.go
@ -17,7 +17,6 @@ package raft
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@ -229,7 +228,7 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
|
|||||||
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
raftLogger.Infof("raft: newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
||||||
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
@ -286,10 +285,10 @@ func (r *raft) sendAppend(to uint64) {
|
|||||||
}
|
}
|
||||||
m.Snapshot = snapshot
|
m.Snapshot = snapshot
|
||||||
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
raftLogger.Infof("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||||
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
||||||
pr.setPendingSnapshot(sindex)
|
pr.setPendingSnapshot(sindex)
|
||||||
log.Printf("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr)
|
raftLogger.Infof("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||||
} else {
|
} else {
|
||||||
m.Type = pb.MsgApp
|
m.Type = pb.MsgApp
|
||||||
m.Index = pr.Next - 1
|
m.Index = pr.Next - 1
|
||||||
@ -413,7 +412,7 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
|
|||||||
r.tick = r.tickElection
|
r.tick = r.tickElection
|
||||||
r.lead = lead
|
r.lead = lead
|
||||||
r.state = StateFollower
|
r.state = StateFollower
|
||||||
log.Printf("raft: %x became follower at term %d", r.id, r.Term)
|
raftLogger.Infof("raft: %x became follower at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) becomeCandidate() {
|
func (r *raft) becomeCandidate() {
|
||||||
@ -426,7 +425,7 @@ func (r *raft) becomeCandidate() {
|
|||||||
r.tick = r.tickElection
|
r.tick = r.tickElection
|
||||||
r.Vote = r.id
|
r.Vote = r.id
|
||||||
r.state = StateCandidate
|
r.state = StateCandidate
|
||||||
log.Printf("raft: %x became candidate at term %d", r.id, r.Term)
|
raftLogger.Infof("raft: %x became candidate at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) becomeLeader() {
|
func (r *raft) becomeLeader() {
|
||||||
@ -449,7 +448,7 @@ func (r *raft) becomeLeader() {
|
|||||||
r.pendingConf = true
|
r.pendingConf = true
|
||||||
}
|
}
|
||||||
r.appendEntry(pb.Entry{Data: nil})
|
r.appendEntry(pb.Entry{Data: nil})
|
||||||
log.Printf("raft: %x became leader at term %d", r.id, r.Term)
|
raftLogger.Infof("raft: %x became leader at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) campaign() {
|
func (r *raft) campaign() {
|
||||||
@ -462,7 +461,7 @@ func (r *raft) campaign() {
|
|||||||
if i == r.id {
|
if i == r.id {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
raftLogger.Infof("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
|
||||||
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
|
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
|
||||||
}
|
}
|
||||||
@ -470,9 +469,9 @@ func (r *raft) campaign() {
|
|||||||
|
|
||||||
func (r *raft) poll(id uint64, v bool) (granted int) {
|
func (r *raft) poll(id uint64, v bool) (granted int) {
|
||||||
if v {
|
if v {
|
||||||
log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
|
raftLogger.Infof("raft: %x received vote from %x at term %d", r.id, id, r.Term)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
|
raftLogger.Infof("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
|
||||||
}
|
}
|
||||||
if _, ok := r.votes[id]; !ok {
|
if _, ok := r.votes[id]; !ok {
|
||||||
r.votes[id] = v
|
r.votes[id] = v
|
||||||
@ -487,7 +486,7 @@ func (r *raft) poll(id uint64, v bool) (granted int) {
|
|||||||
|
|
||||||
func (r *raft) Step(m pb.Message) error {
|
func (r *raft) Step(m pb.Message) error {
|
||||||
if m.Type == pb.MsgHup {
|
if m.Type == pb.MsgHup {
|
||||||
log.Printf("raft: %x is starting a new election at term %d", r.id, r.Term)
|
raftLogger.Infof("raft: %x is starting a new election at term %d", r.id, r.Term)
|
||||||
r.campaign()
|
r.campaign()
|
||||||
r.Commit = r.raftLog.committed
|
r.Commit = r.raftLog.committed
|
||||||
return nil
|
return nil
|
||||||
@ -501,12 +500,12 @@ func (r *raft) Step(m pb.Message) error {
|
|||||||
if m.Type == pb.MsgVote {
|
if m.Type == pb.MsgVote {
|
||||||
lead = None
|
lead = None
|
||||||
}
|
}
|
||||||
log.Printf("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]",
|
raftLogger.Infof("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]",
|
||||||
r.id, r.Term, m.Type, m.From, m.Term)
|
r.id, r.Term, m.Type, m.From, m.Term)
|
||||||
r.becomeFollower(m.Term, lead)
|
r.becomeFollower(m.Term, lead)
|
||||||
case m.Term < r.Term:
|
case m.Term < r.Term:
|
||||||
// ignore
|
// ignore
|
||||||
log.Printf("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]",
|
raftLogger.Infof("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]",
|
||||||
r.id, r.Term, m.Type, m.From, m.Term)
|
r.id, r.Term, m.Type, m.From, m.Term)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -525,7 +524,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
r.bcastHeartbeat()
|
r.bcastHeartbeat()
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
if len(m.Entries) == 0 {
|
if len(m.Entries) == 0 {
|
||||||
log.Panicf("raft: %x stepped empty MsgProp", r.id)
|
raftLogger.Panicf("raft: %x stepped empty MsgProp", r.id)
|
||||||
}
|
}
|
||||||
for i, e := range m.Entries {
|
for i, e := range m.Entries {
|
||||||
if e.Type == pb.EntryConfChange {
|
if e.Type == pb.EntryConfChange {
|
||||||
@ -540,20 +539,20 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
case pb.MsgAppResp:
|
case pb.MsgAppResp:
|
||||||
if pr.isUnreachable() {
|
if pr.isUnreachable() {
|
||||||
pr.reachable()
|
pr.reachable()
|
||||||
log.Printf("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
if m.Reject {
|
if m.Reject {
|
||||||
log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
|
raftLogger.Infof("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
|
||||||
r.id, m.RejectHint, m.From, m.Index)
|
r.id, m.RejectHint, m.From, m.Index)
|
||||||
if pr.maybeDecrTo(m.Index, m.RejectHint) {
|
if pr.maybeDecrTo(m.Index, m.RejectHint) {
|
||||||
log.Printf("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr)
|
||||||
r.sendAppend(m.From)
|
r.sendAppend(m.From)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
oldWait := pr.shouldWait()
|
oldWait := pr.shouldWait()
|
||||||
pr.update(m.Index)
|
pr.update(m.Index)
|
||||||
if r.prs[m.From].maybeSnapshotAbort() {
|
if r.prs[m.From].maybeSnapshotAbort() {
|
||||||
log.Printf("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
if r.maybeCommit() {
|
if r.maybeCommit() {
|
||||||
r.bcastAppend()
|
r.bcastAppend()
|
||||||
@ -566,13 +565,13 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
case pb.MsgHeartbeatResp:
|
case pb.MsgHeartbeatResp:
|
||||||
if pr.isUnreachable() {
|
if pr.isUnreachable() {
|
||||||
pr.reachable()
|
pr.reachable()
|
||||||
log.Printf("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
if pr.Match < r.raftLog.lastIndex() {
|
if pr.Match < r.raftLog.lastIndex() {
|
||||||
r.sendAppend(m.From)
|
r.sendAppend(m.From)
|
||||||
}
|
}
|
||||||
case pb.MsgVote:
|
case pb.MsgVote:
|
||||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||||
case pb.MsgSnapStatus:
|
case pb.MsgSnapStatus:
|
||||||
@ -581,10 +580,10 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
}
|
}
|
||||||
if m.Reject {
|
if m.Reject {
|
||||||
pr.snapshotFail()
|
pr.snapshotFail()
|
||||||
log.Printf("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
} else {
|
} else {
|
||||||
pr.snapshotFinish()
|
pr.snapshotFinish()
|
||||||
log.Printf("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
// wait for the msgAppResp from the remote node before sending
|
// wait for the msgAppResp from the remote node before sending
|
||||||
// out the next msgApp
|
// out the next msgApp
|
||||||
pr.waitSet(r.electionTimeout)
|
pr.waitSet(r.electionTimeout)
|
||||||
@ -592,7 +591,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
case pb.MsgUnreachable:
|
case pb.MsgUnreachable:
|
||||||
if !pr.isUnreachable() {
|
if !pr.isUnreachable() {
|
||||||
pr.unreachable()
|
pr.unreachable()
|
||||||
log.Printf("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr)
|
raftLogger.Infof("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -600,7 +599,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
func stepCandidate(r *raft, m pb.Message) {
|
func stepCandidate(r *raft, m pb.Message) {
|
||||||
switch m.Type {
|
switch m.Type {
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
log.Printf("raft: %x no leader at term %d; dropping proposal", r.id, r.Term)
|
raftLogger.Infof("raft: %x no leader at term %d; dropping proposal", r.id, r.Term)
|
||||||
return
|
return
|
||||||
case pb.MsgApp:
|
case pb.MsgApp:
|
||||||
r.becomeFollower(r.Term, m.From)
|
r.becomeFollower(r.Term, m.From)
|
||||||
@ -612,12 +611,12 @@ func stepCandidate(r *raft, m pb.Message) {
|
|||||||
r.becomeFollower(m.Term, m.From)
|
r.becomeFollower(m.Term, m.From)
|
||||||
r.handleSnapshot(m)
|
r.handleSnapshot(m)
|
||||||
case pb.MsgVote:
|
case pb.MsgVote:
|
||||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||||
case pb.MsgVoteResp:
|
case pb.MsgVoteResp:
|
||||||
gr := r.poll(m.From, !m.Reject)
|
gr := r.poll(m.From, !m.Reject)
|
||||||
log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
|
raftLogger.Infof("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
|
||||||
switch r.q() {
|
switch r.q() {
|
||||||
case gr:
|
case gr:
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
@ -632,7 +631,7 @@ func stepFollower(r *raft, m pb.Message) {
|
|||||||
switch m.Type {
|
switch m.Type {
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
if r.lead == None {
|
if r.lead == None {
|
||||||
log.Printf("raft: %x no leader at term %d; dropping proposal", r.id, r.Term)
|
raftLogger.Infof("raft: %x no leader at term %d; dropping proposal", r.id, r.Term)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.To = r.lead
|
m.To = r.lead
|
||||||
@ -651,12 +650,12 @@ func stepFollower(r *raft, m pb.Message) {
|
|||||||
case pb.MsgVote:
|
case pb.MsgVote:
|
||||||
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
||||||
r.elapsed = 0
|
r.elapsed = 0
|
||||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
|
raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.Vote = m.From
|
r.Vote = m.From
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
|
||||||
} else {
|
} else {
|
||||||
log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||||
}
|
}
|
||||||
@ -667,7 +666,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
|
|||||||
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
|
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
|
||||||
} else {
|
} else {
|
||||||
log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
|
raftLogger.Infof("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
|
||||||
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
|
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
||||||
}
|
}
|
||||||
@ -681,11 +680,11 @@ func (r *raft) handleHeartbeat(m pb.Message) {
|
|||||||
func (r *raft) handleSnapshot(m pb.Message) {
|
func (r *raft) handleSnapshot(m pb.Message) {
|
||||||
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
||||||
if r.restore(m.Snapshot) {
|
if r.restore(m.Snapshot) {
|
||||||
log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
|
raftLogger.Infof("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, sindex, sterm)
|
r.id, r.Commit, sindex, sterm)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
||||||
} else {
|
} else {
|
||||||
log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
raftLogger.Infof("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, sindex, sterm)
|
r.id, r.Commit, sindex, sterm)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
||||||
}
|
}
|
||||||
@ -698,13 +697,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
||||||
log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
raftLogger.Infof("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||||
r.raftLog.commitTo(s.Metadata.Index)
|
r.raftLog.commitTo(s.Metadata.Index)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
raftLogger.Infof("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||||
|
|
||||||
r.raftLog.restore(s)
|
r.raftLog.restore(s)
|
||||||
@ -717,7 +716,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||||||
match = 0
|
match = 0
|
||||||
}
|
}
|
||||||
r.setProgress(n, match, next)
|
r.setProgress(n, match, next)
|
||||||
log.Printf("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n])
|
raftLogger.Infof("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n])
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -761,7 +760,7 @@ func (r *raft) delProgress(id uint64) {
|
|||||||
|
|
||||||
func (r *raft) loadState(state pb.HardState) {
|
func (r *raft) loadState(state pb.HardState) {
|
||||||
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
||||||
log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
raftLogger.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
||||||
}
|
}
|
||||||
r.raftLog.committed = state.Commit
|
r.raftLog.committed = state.Commit
|
||||||
r.Term = state.Term
|
r.Term = state.Term
|
||||||
|
@ -28,12 +28,10 @@ package raft
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"testing"
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
|
||||||
|
|
||||||
pb "github.com/coreos/etcd/raft/raftpb"
|
pb "github.com/coreos/etcd/raft/raftpb"
|
||||||
)
|
)
|
||||||
@ -294,13 +292,13 @@ func TestCandidateFallback(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFollowerElectionTimeoutRandomized(t *testing.T) {
|
func TestFollowerElectionTimeoutRandomized(t *testing.T) {
|
||||||
log.SetOutput(ioutil.Discard)
|
SetLogger(discardLogger)
|
||||||
defer log.SetOutput(os.Stderr)
|
defer SetLogger(defaultLogger)
|
||||||
testNonleaderElectionTimeoutRandomized(t, StateFollower)
|
testNonleaderElectionTimeoutRandomized(t, StateFollower)
|
||||||
}
|
}
|
||||||
func TestCandidateElectionTimeoutRandomized(t *testing.T) {
|
func TestCandidateElectionTimeoutRandomized(t *testing.T) {
|
||||||
log.SetOutput(ioutil.Discard)
|
SetLogger(discardLogger)
|
||||||
defer log.SetOutput(os.Stderr)
|
defer SetLogger(defaultLogger)
|
||||||
testNonleaderElectionTimeoutRandomized(t, StateCandidate)
|
testNonleaderElectionTimeoutRandomized(t, StateCandidate)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -335,13 +333,13 @@ func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFollowersElectioinTimeoutNonconflict(t *testing.T) {
|
func TestFollowersElectioinTimeoutNonconflict(t *testing.T) {
|
||||||
log.SetOutput(ioutil.Discard)
|
SetLogger(discardLogger)
|
||||||
defer log.SetOutput(os.Stderr)
|
defer SetLogger(defaultLogger)
|
||||||
testNonleadersElectionTimeoutNonconflict(t, StateFollower)
|
testNonleadersElectionTimeoutNonconflict(t, StateFollower)
|
||||||
}
|
}
|
||||||
func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
|
func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
|
||||||
log.SetOutput(ioutil.Discard)
|
SetLogger(discardLogger)
|
||||||
defer log.SetOutput(os.Stderr)
|
defer SetLogger(defaultLogger)
|
||||||
testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
|
testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ func (n *node) start() {
|
|||||||
n.Step(context.TODO(), m)
|
n.Step(context.TODO(), m)
|
||||||
case <-n.stopc:
|
case <-n.stopc:
|
||||||
n.Stop()
|
n.Stop()
|
||||||
log.Printf("raft.%d: stop", n.id)
|
raftLogger.Infof("raft.%d: stop", n.id)
|
||||||
n.Node = nil
|
n.Node = nil
|
||||||
close(n.stopc)
|
close(n.stopc)
|
||||||
return
|
return
|
||||||
|
@ -16,7 +16,6 @@ package raft
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
|
|
||||||
pb "github.com/coreos/etcd/raft/raftpb"
|
pb "github.com/coreos/etcd/raft/raftpb"
|
||||||
)
|
)
|
||||||
@ -70,7 +69,7 @@ func (s Status) MarshalJSON() ([]byte, error) {
|
|||||||
func (s Status) String() string {
|
func (s Status) String() string {
|
||||||
b, err := s.MarshalJSON()
|
b, err := s.MarshalJSON()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("unexpected error: %v", err)
|
raftLogger.Panicf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
return string(b)
|
return string(b)
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,6 @@ package raft
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
pb "github.com/coreos/etcd/raft/raftpb"
|
pb "github.com/coreos/etcd/raft/raftpb"
|
||||||
@ -101,7 +100,7 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
|
|||||||
return nil, ErrCompacted
|
return nil, ErrCompacted
|
||||||
}
|
}
|
||||||
if hi > ms.lastIndex()+1 {
|
if hi > ms.lastIndex()+1 {
|
||||||
log.Panicf("entries's hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
|
raftLogger.Panicf("entries's hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
|
||||||
}
|
}
|
||||||
// only contains dummy entries.
|
// only contains dummy entries.
|
||||||
if len(ms.ents) == 1 {
|
if len(ms.ents) == 1 {
|
||||||
@ -175,7 +174,7 @@ func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte)
|
|||||||
|
|
||||||
offset := ms.ents[0].Index
|
offset := ms.ents[0].Index
|
||||||
if i > ms.lastIndex() {
|
if i > ms.lastIndex() {
|
||||||
log.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
|
raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
|
||||||
}
|
}
|
||||||
|
|
||||||
ms.snapshot.Metadata.Index = i
|
ms.snapshot.Metadata.Index = i
|
||||||
@ -196,7 +195,7 @@ func (ms *MemoryStorage) Compact(compactIndex uint64) error {
|
|||||||
return ErrCompacted
|
return ErrCompacted
|
||||||
}
|
}
|
||||||
if compactIndex > ms.lastIndex() {
|
if compactIndex > ms.lastIndex() {
|
||||||
log.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
|
raftLogger.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
|
||||||
}
|
}
|
||||||
|
|
||||||
i := compactIndex - offset
|
i := compactIndex - offset
|
||||||
@ -237,7 +236,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error {
|
|||||||
case uint64(len(ms.ents)) == offset:
|
case uint64(len(ms.ents)) == offset:
|
||||||
ms.ents = append(ms.ents, entries...)
|
ms.ents = append(ms.ents, entries...)
|
||||||
default:
|
default:
|
||||||
log.Panicf("missing log entry [last: %d, append at: %d]",
|
raftLogger.Panicf("missing log entry [last: %d, append at: %d]",
|
||||||
ms.lastIndex(), entries[0].Index)
|
ms.lastIndex(), entries[0].Index)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user