diff --git a/third_party/github.com/coreos/raft/append_entries_request_test.go b/third_party/github.com/coreos/raft/append_entries_request_test.go index d8cbce735..34a827f67 100644 --- a/third_party/github.com/coreos/raft/append_entries_request_test.go +++ b/third_party/github.com/coreos/raft/append_entries_request_test.go @@ -28,7 +28,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by entries := make([]*LogEntry, 0) for i := 0; i < entryCount; i++ { command := &DefaultJoinCommand{Name: "localhost:1000"} - entry, _ := newLogEntry(nil, 1, 2, command) + entry, _ := newLogEntry(nil, nil, 1, 2, command) entries = append(entries, entry) } req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries) diff --git a/third_party/github.com/coreos/raft/event.go b/third_party/github.com/coreos/raft/event.go index 8cd20bf66..649a1158c 100644 --- a/third_party/github.com/coreos/raft/event.go +++ b/third_party/github.com/coreos/raft/event.go @@ -6,6 +6,9 @@ const ( TermChangeEventType = "termChange" AddPeerEventType = "addPeer" RemovePeerEventType = "removePeer" + + HeartbeatTimeoutEventType = "heartbeatTimeout" + ElectionTimeoutThresholdEventType = "electionTimeoutThreshold" ) // Event represents an action that occurred within the Raft library. diff --git a/third_party/github.com/coreos/raft/http_transporter.go b/third_party/github.com/coreos/raft/http_transporter.go index de2fa67ac..a74a5d628 100644 --- a/third_party/github.com/coreos/raft/http_transporter.go +++ b/third_party/github.com/coreos/raft/http_transporter.go @@ -24,6 +24,7 @@ type HTTPTransporter struct { appendEntriesPath string requestVotePath string httpClient http.Client + Transport *http.Transport } type HTTPMuxer interface { @@ -38,13 +39,15 @@ type HTTPMuxer interface { // Creates a new HTTP transporter with the given path prefix. func NewHTTPTransporter(prefix string) *HTTPTransporter { - return &HTTPTransporter{ + t := &HTTPTransporter{ DisableKeepAlives: false, prefix: prefix, appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), - httpClient: http.Client{Transport: &http.Transport{DisableKeepAlives: false}}, + Transport: &http.Transport{DisableKeepAlives: false}, } + t.httpClient.Transport = t.Transport + return t } //------------------------------------------------------------------------------ @@ -99,6 +102,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) + t.Transport.ResponseHeaderTimeout = server.ElectionTimeout() httpResp, err := t.httpClient.Post(url, "application/protobuf", &b) if httpResp == nil || err != nil { traceln("transporter.ae.response.error:", err) diff --git a/third_party/github.com/coreos/raft/log.go b/third_party/github.com/coreos/raft/log.go index 88be08e98..f137ecf98 100644 --- a/third_party/github.com/coreos/raft/log.go +++ b/third_party/github.com/coreos/raft/log.go @@ -23,7 +23,6 @@ type Log struct { file *os.File path string entries []*LogEntry - results []*logResult commitIndex uint64 mutex sync.RWMutex startIndex uint64 // the index before the first entry in the Log entries @@ -162,7 +161,7 @@ func (l *Log) open(path string) error { // Read the file and decode entries. for { // Instantiate log entry and decode into it. - entry, _ := newLogEntry(l, 0, 0, nil) + entry, _ := newLogEntry(l, nil, 0, 0, nil) entry.Position, _ = l.file.Seek(0, os.SEEK_CUR) n, err := entry.decode(l.file) @@ -191,8 +190,6 @@ func (l *Log) open(path string) error { readBytes += int64(n) } - l.results = make([]*logResult, len(l.entries)) - debugln("open.log.recovery number of log ", len(l.entries)) return nil } @@ -207,7 +204,6 @@ func (l *Log) close() { l.file = nil } l.entries = make([]*LogEntry, 0) - l.results = make([]*logResult, 0) } //-------------------------------------- @@ -215,8 +211,8 @@ func (l *Log) close() { //-------------------------------------- // Creates a log entry associated with this log. -func (l *Log) createEntry(term uint64, command Command) (*LogEntry, error) { - return newLogEntry(l, l.nextIndex(), term, command) +func (l *Log) createEntry(term uint64, command Command, e *ev) (*LogEntry, error) { + return newLogEntry(l, e, l.nextIndex(), term, command) } // Retrieves an entry from the log. If the entry has been eliminated because @@ -276,35 +272,6 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]* } } -// Retrieves the return value and error for an entry. The result can only exist -// after the entry has been committed. -func (l *Log) getEntryResult(entry *LogEntry, clear bool) (interface{}, error) { - l.mutex.RLock() - defer l.mutex.RUnlock() - - if entry == nil { - panic("raft: Log entry required for error retrieval") - } - debugln("getEntryResult.result index: ", entry.Index-l.startIndex-1) - // If a result exists for the entry then return it with its error. - if entry.Index > l.startIndex && entry.Index <= l.startIndex+uint64(len(l.results)) { - if result := l.results[entry.Index-l.startIndex-1]; result != nil { - - // keep the records before remove it - returnValue, err := result.returnValue, result.err - - // Remove reference to result if it's being cleared after retrieval. - if clear { - result.returnValue = nil - } - - return returnValue, err - } - } - - return nil, nil -} - //-------------------------------------- // Commit //-------------------------------------- @@ -402,7 +369,10 @@ func (l *Log) setCommitIndex(index uint64) error { // Apply the changes to the state machine and store the error code. returnValue, err := l.ApplyFunc(command) debugln("setCommitIndex.set.result index: ", entryIndex) - l.results[entryIndex] = &logResult{returnValue: returnValue, err: err} + if entry.event != nil { + entry.event.returnValue = returnValue + entry.event.c <- err + } } return nil } @@ -443,6 +413,14 @@ func (l *Log) truncate(index uint64, term uint64) error { debugln("log.truncate.clear") l.file.Truncate(0) l.file.Seek(0, os.SEEK_SET) + + // notify clients if this node is the previous leader + for _, entry := range l.entries { + if entry.event != nil { + entry.event.c <- errors.New("command failed to be committed due to node failure") + } + } + l.entries = []*LogEntry{} } else { // Do not truncate if the entry at index does not have the matching term. @@ -458,6 +436,15 @@ func (l *Log) truncate(index uint64, term uint64) error { position := l.entries[index-l.startIndex].Position l.file.Truncate(position) l.file.Seek(position, os.SEEK_SET) + + // notify clients if this node is the previous leader + for i := index - l.startIndex; i < uint64(len(l.entries)); i++ { + entry := l.entries[i] + if entry.event != nil { + entry.event.c <- errors.New("command failed to be committed due to node failure") + } + } + l.entries = l.entries[0 : index-l.startIndex] } } @@ -529,7 +516,6 @@ func (l *Log) appendEntry(entry *LogEntry) error { // Append to entries list if stored on disk. l.entries = append(l.entries, entry) - l.results = append(l.results, nil) return nil } @@ -558,7 +544,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) { // Append to entries list if stored on disk. l.entries = append(l.entries, entry) - l.results = append(l.results, nil) return int64(size), nil } @@ -570,7 +555,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) { // compact the log before index (including index) func (l *Log) compact(index uint64, term uint64) error { var entries []*LogEntry - var results []*logResult l.mutex.Lock() defer l.mutex.Unlock() @@ -583,11 +567,9 @@ func (l *Log) compact(index uint64, term uint64) error { // we just recovery from on snapshot if index >= l.internalCurrentIndex() { entries = make([]*LogEntry, 0) - results = make([]*logResult, 0) } else { // get all log entries after index entries = l.entries[index-l.startIndex:] - results = l.results[index-l.startIndex:] } // create a new log file and add all the entries @@ -621,7 +603,6 @@ func (l *Log) compact(index uint64, term uint64) error { // compaction the in memory log l.entries = entries - l.results = results l.startIndex = index l.startTerm = term return nil diff --git a/third_party/github.com/coreos/raft/log_entry.go b/third_party/github.com/coreos/raft/log_entry.go index ff344d048..94cfb94e2 100644 --- a/third_party/github.com/coreos/raft/log_entry.go +++ b/third_party/github.com/coreos/raft/log_entry.go @@ -17,11 +17,11 @@ type LogEntry struct { CommandName string Command []byte Position int64 // position in the log file - commit chan bool + event *ev } // Creates a new log entry associated with a log. -func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntry, error) { +func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command) (*LogEntry, error) { var buf bytes.Buffer var commandName string if command != nil { @@ -41,7 +41,7 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr Term: term, CommandName: commandName, Command: buf.Bytes(), - commit: make(chan bool, 5), + event: event, } return e, nil diff --git a/third_party/github.com/coreos/raft/log_test.go b/third_party/github.com/coreos/raft/log_test.go index e890090c3..e2b53c846 100644 --- a/third_party/github.com/coreos/raft/log_test.go +++ b/third_party/github.com/coreos/raft/log_test.go @@ -30,15 +30,15 @@ func TestLogNewLog(t *testing.T) { defer log.close() defer os.Remove(path) - e, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20}) + e, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } - e, _ = newLogEntry(log, 2, 1, &testCommand2{X: 100}) + e, _ = newLogEntry(log, nil, 2, 1, &testCommand2{X: 100}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } - e, _ = newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0}) + e, _ = newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } @@ -63,9 +63,9 @@ func TestLogNewLog(t *testing.T) { // Ensure that we can decode and encode to an existing log. func TestLogExistingLog(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) log, path := setupLog([]*LogEntry{e0, e1, e2}) defer log.close() defer os.Remove(path) @@ -88,9 +88,9 @@ func TestLogExistingLog(t *testing.T) { // Ensure that we can check the contents of the log by index/term. func TestLogContainsEntries(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) log, path := setupLog([]*LogEntry{e0, e1, e2}) defer log.close() defer os.Remove(path) @@ -115,8 +115,8 @@ func TestLogContainsEntries(t *testing.T) { // Ensure that we can recover from an incomplete/corrupt log and continue logging. func TestLogRecovery(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) f, _ := ioutil.TempFile("", "raft-log-") e0.encode(f) @@ -134,7 +134,7 @@ func TestLogRecovery(t *testing.T) { defer log.close() defer os.Remove(f.Name()) - e, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bat", I: -5}) + e, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bat", I: -5}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } @@ -167,15 +167,15 @@ func TestLogTruncate(t *testing.T) { defer os.Remove(path) - entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20}) + entry1, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) if err := log.appendEntry(entry1); err != nil { t.Fatalf("Unable to append: %v", err) } - entry2, _ := newLogEntry(log, 2, 1, &testCommand2{X: 100}) + entry2, _ := newLogEntry(log, nil, 2, 1, &testCommand2{X: 100}) if err := log.appendEntry(entry2); err != nil { t.Fatalf("Unable to append: %v", err) } - entry3, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0}) + entry3, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) if err := log.appendEntry(entry3); err != nil { t.Fatalf("Unable to append: %v", err) } diff --git a/third_party/github.com/coreos/raft/peer.go b/third_party/github.com/coreos/raft/peer.go index 05a398178..1a0f20583 100644 --- a/third_party/github.com/coreos/raft/peer.go +++ b/third_party/github.com/coreos/raft/peer.go @@ -176,6 +176,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req) if resp == nil { + p.server.DispatchEvent(newEvent(HeartbeatTimeoutEventType, p, nil)) debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name) return } diff --git a/third_party/github.com/coreos/raft/server.go b/third_party/github.com/coreos/raft/server.go index 1eafa2675..4513196da 100644 --- a/third_party/github.com/coreos/raft/server.go +++ b/third_party/github.com/coreos/raft/server.go @@ -37,6 +37,11 @@ const ( DefaultElectionTimeout = 150 * time.Millisecond ) +// ElectionTimeoutThresholdPercent specifies the threshold at which the server +// will dispatch warning events that the heartbeat RTT is too close to the +// election timeout. +const ElectionTimeoutThresholdPercent = 0.8 + var stopValue interface{} //------------------------------------------------------------------------------ @@ -575,6 +580,8 @@ func (s *server) sendAsync(value interface{}) *ev { func (s *server) followerLoop() { s.setState(Follower) + since := time.Now() + electionTimeout := s.ElectionTimeout() timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) for { @@ -597,6 +604,11 @@ func (s *server) followerLoop() { err = NotLeaderError } case *AppendEntriesRequest: + // If heartbeats get too close to the election timeout then send an event. + elapsedTime := time.Now().Sub(since) + if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) { + s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil)) + } e.returnValue, update = s.processAppendEntriesRequest(req) case *RequestVoteRequest: e.returnValue, update = s.processRequestVoteRequest(req) @@ -624,6 +636,7 @@ func (s *server) followerLoop() { // 1.Receiving valid AppendEntries RPC, or // 2.Granting vote to candidate if update { + since = time.Now() timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) } @@ -824,7 +837,7 @@ func (s *server) processCommand(command Command, e *ev) { s.debugln("server.command.process") // Create an entry for the command in the log. - entry, err := s.log.createEntry(s.currentTerm, command) + entry, err := s.log.createEntry(s.currentTerm, command, e) if err != nil { s.debugln("server.command.log.entry.error:", err) @@ -838,21 +851,6 @@ func (s *server) processCommand(command Command, e *ev) { return } - // Issue a callback for the entry once it's committed. - go func() { - // Wait for the entry to be committed. - select { - case <-entry.commit: - var err error - s.debugln("server.command.commit") - e.returnValue, err = s.log.getEntryResult(entry, true) - e.c <- err - case <-time.After(time.Second): - s.debugln("server.command.timeout") - e.c <- CommandTimeoutError - } - }() - // Issue an append entries response for the server. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()) resp.append = true @@ -957,22 +955,6 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { if commitIndex > committedIndex { s.log.setCommitIndex(commitIndex) s.debugln("commit index ", commitIndex) - for i := committedIndex; i < commitIndex; i++ { - if entry := s.log.getEntry(i + 1); entry != nil { - // if the leader is a new one and the entry came from the - // old leader, the commit channel will be nil and no go routine - // is waiting from this channel - // if we try to send to it, the new leader will get stuck - if entry.commit != nil { - select { - case entry.commit <- true: - default: - panic("server unable to send signal to commit channel") - } - entry.commit = nil - } - } - } } } diff --git a/third_party/github.com/coreos/raft/server_test.go b/third_party/github.com/coreos/raft/server_test.go index f8be91e5f..3ac36811b 100644 --- a/third_party/github.com/coreos/raft/server_test.go +++ b/third_party/github.com/coreos/raft/server_test.go @@ -111,9 +111,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { // Ensure that a vote request is denied if the log is out of date. func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) // start as a follower with term 2 and index 3 @@ -151,7 +151,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader. func TestServerPromoteSelf(t *testing.T) { - e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) + e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20}) s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0}) // start as a follower @@ -204,7 +204,7 @@ func TestServerAppendEntries(t *testing.T) { defer s.Stop() // Append single entry. - e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -215,8 +215,8 @@ func TestServerAppendEntries(t *testing.T) { } // Append multiple entries + commit the last one. - e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) - e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30}) + e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20}) + e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30}) entries = []*LogEntry{e1, e2} resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -248,7 +248,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { s.(*server).mutex.Unlock() // Append single entry. - e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 2 || resp.Success { @@ -266,8 +266,8 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { defer s.Stop() // Append single entry + commit. - e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) - e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) + e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15}) entries := []*LogEntry{e1, e2} resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -275,7 +275,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { } // Append entry again (post-commit). - e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) + e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20}) entries = []*LogEntry{e} resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries)) if resp.Term != 1 || resp.Success { @@ -289,9 +289,9 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { s.Start() defer s.Stop() - entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) - entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) - entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20}) + entry1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + entry2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15}) + entry3, _ := newLogEntry(nil, nil, 2, 2, &testCommand1{Val: "bar", I: 20}) // Append single entry + commit. entries := []*LogEntry{entry1, entry2} diff --git a/third_party/github.com/coreos/raft/test.go b/third_party/github.com/coreos/raft/test.go index dfe0a39f2..3ea9c7e3b 100644 --- a/third_party/github.com/coreos/raft/test.go +++ b/third_party/github.com/coreos/raft/test.go @@ -103,7 +103,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server { servers := []Server{} - e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) + e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20}) for _, name := range names { if lookup[name] != nil {