diff --git a/etcdserver/force_cluster.go b/etcdserver/raft.go similarity index 63% rename from etcdserver/force_cluster.go rename to etcdserver/raft.go index 0c9b38c89..d7d69a2fd 100644 --- a/etcdserver/force_cluster.go +++ b/etcdserver/raft.go @@ -1,5 +1,5 @@ /* - Copyright 2014 CoreOS, Inc. + Copyright 2015 CoreOS, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,16 +19,107 @@ package etcdserver import ( "encoding/json" "log" + "os" "sort" + "time" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" ) +type RaftTimer interface { + Index() uint64 + Term() uint64 +} + +type raftNode struct { + raft.Node + + // config + snapCount uint64 // number of entries to trigger a snapshot + + // utility + ticker <-chan time.Time + raftStorage *raft.MemoryStorage + storage Storage + // transport specifies the transport to send and receive msgs to members. + // Sending messages MUST NOT block. It is okay to drop messages, since + // clients should timeout and reissue their messages. + // If transport is nil, server will panic. + transport rafthttp.Transporter + + // Cache of the latest raft index and raft term the server has seen + index uint64 + term uint64 + lead uint64 +} + +// for testing +func (r *raftNode) pauseSending() { + p := r.transport.(rafthttp.Pausable) + p.Pause() +} + +func (r *raftNode) resumeSending() { + p := r.transport.(rafthttp.Pausable) + p.Resume() +} + +func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { + var err error + member := cfg.Cluster.MemberByName(cfg.Name) + metadata := pbutil.MustMarshal( + &pb.Metadata{ + NodeID: uint64(member.ID), + ClusterID: uint64(cfg.Cluster.ID()), + }, + ) + if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { + log.Fatalf("etcdserver create snapshot directory error: %v", err) + } + if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { + log.Fatalf("etcdserver: create wal error: %v", err) + } + peers := make([]raft.Peer, len(ids)) + for i, id := range ids { + ctx, err := json.Marshal((*cfg.Cluster).Member(id)) + if err != nil { + log.Panicf("marshal member should never fail: %v", err) + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + id = member.ID + log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) + s = raft.NewMemoryStorage() + n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s) + return +} + +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) + cfg.Cluster.SetID(cid) + + log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) + s := raft.NewMemoryStorage() + if snapshot != nil { + s.ApplySnapshot(*snapshot) + } + s.SetHardState(st) + s.Append(ents) + n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s) + return id, n, s, w +} + func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { diff --git a/etcdserver/force_cluster_test.go b/etcdserver/raft_test.go similarity index 100% rename from etcdserver/force_cluster_test.go rename to etcdserver/raft_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index 3c2dbc117..a510b649d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -23,7 +23,6 @@ import ( "log" "math/rand" "net/http" - "os" "path" "regexp" "sort" @@ -46,7 +45,6 @@ import ( "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" - "github.com/coreos/etcd/wal/walpb" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -115,14 +113,12 @@ type Server interface { UpdateMember(ctx context.Context, updateMemb Member) error } -type RaftTimer interface { - Index() uint64 - Term() uint64 -} - // EtcdServer is the production implementation of the Server interface type EtcdServer struct { - cfg *ServerConfig + cfg *ServerConfig + + r raftNode + w wait.Wait stop chan struct{} done chan struct{} @@ -132,32 +128,13 @@ type EtcdServer struct { Cluster *Cluster - node raft.Node - raftStorage *raft.MemoryStorage - storage Storage - store store.Store stats *stats.ServerStats lstats *stats.LeaderStats - // transport specifies the transport to send and receive msgs to members. - // Sending messages MUST NOT block. It is okay to drop messages, since - // clients should timeout and reissue their messages. - // If transport is nil, server will panic. - transport rafthttp.Transporter - - Ticker <-chan time.Time SyncTicker <-chan time.Time - snapCount uint64 // number of entries to trigger a snapshot - - // Cache of the latest raft index and raft term the server has seen - raftIndex uint64 - raftTerm uint64 - - raftLead uint64 - reqIDGen *idutil.Generator } @@ -254,21 +231,23 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { lstats := stats.NewLeaderStats(id.String()) srv := &EtcdServer{ - cfg: cfg, - errorc: make(chan error, 1), - store: st, - node: n, - raftStorage: s, - id: id, - attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - Cluster: cfg.Cluster, - storage: NewStorage(w, ss), - stats: sstats, - lstats: lstats, - Ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), - SyncTicker: time.Tick(500 * time.Millisecond), - snapCount: cfg.SnapCount, - reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), + cfg: cfg, + errorc: make(chan error, 1), + store: st, + r: raftNode{ + Node: n, + snapCount: cfg.SnapCount, + ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), + raftStorage: s, + storage: NewStorage(w, ss), + }, + id: id, + attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + Cluster: cfg.Cluster, + stats: sstats, + lstats: lstats, + SyncTicker: time.Tick(500 * time.Millisecond), + reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), } tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) @@ -278,7 +257,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { tr.AddPeer(m.ID, m.PeerURLs) } } - srv.transport = tr + srv.r.transport = tr return srv, nil } @@ -295,9 +274,9 @@ func (s *EtcdServer) Start() { // modify a server's fields after it has been sent to Start. // This function is just used for testing. func (s *EtcdServer) start() { - if s.snapCount == 0 { + if s.r.snapCount == 0 { log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) - s.snapCount = DefaultSnapCount + s.r.snapCount = DefaultSnapCount } s.w = wait.New() s.done = make(chan struct{}) @@ -328,7 +307,7 @@ func (s *EtcdServer) purgeFile() { func (s *EtcdServer) ID() types.ID { return s.id } -func (s *EtcdServer) RaftHandler() http.Handler { return s.transport.Handler() } +func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.Cluster.IsIDRemoved(types.ID(m.From)) { @@ -338,7 +317,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if m.Type == raftpb.MsgApp { s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) } - return s.node.Step(ctx, m) + return s.r.Step(ctx, m) } func (s *EtcdServer) run() { @@ -346,7 +325,7 @@ func (s *EtcdServer) run() { var shouldstop bool // load initial state from raft storage - snap, err := s.raftStorage.Snapshot() + snap, err := s.r.raftStorage.Snapshot() if err != nil { log.Panicf("etcdserver: get snapshot from raft storage error: %v", err) } @@ -356,20 +335,21 @@ func (s *EtcdServer) run() { confState := snap.Metadata.ConfState defer func() { - s.node.Stop() - s.transport.Stop() - if err := s.storage.Close(); err != nil { + s.r.Stop() + s.r.transport.Stop() + if err := s.r.storage.Close(); err != nil { log.Panicf("etcdserver: close storage error: %v", err) } close(s.done) }() + // TODO: make raft loop a method on raftNode for { select { - case <-s.Ticker: - s.node.Tick() - case rd := <-s.node.Ready(): + case <-s.r.ticker: + s.r.Tick() + case rd := <-s.r.Ready(): if rd.SoftState != nil { - atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead) + atomic.StoreUint64(&s.r.lead, rd.SoftState.Lead) if rd.RaftState == raft.StateLeader { syncC = s.SyncTicker // TODO: remove the nil checking @@ -384,18 +364,18 @@ func (s *EtcdServer) run() { // apply snapshot to storage if it is more updated than current snapi if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi { - if err := s.storage.SaveSnap(rd.Snapshot); err != nil { + if err := s.r.storage.SaveSnap(rd.Snapshot); err != nil { log.Fatalf("etcdserver: save snapshot error: %v", err) } - s.raftStorage.ApplySnapshot(rd.Snapshot) + s.r.raftStorage.ApplySnapshot(rd.Snapshot) snapi = rd.Snapshot.Metadata.Index log.Printf("etcdserver: saved incoming snapshot at index %d", snapi) } - if err := s.storage.Save(rd.HardState, rd.Entries); err != nil { + if err := s.r.storage.Save(rd.HardState, rd.Entries); err != nil { log.Fatalf("etcdserver: save state and entries error: %v", err) } - s.raftStorage.Append(rd.Entries) + s.r.raftStorage.Append(rd.Entries) s.send(rd.Messages) @@ -427,9 +407,9 @@ func (s *EtcdServer) run() { } } - s.node.Advance() + s.r.Advance() - if appliedi-snapi > s.snapCount { + if appliedi-snapi > s.r.snapCount { log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) s.snapshot(appliedi, &confState) snapi = appliedi @@ -486,7 +466,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { return Response{}, err } ch := s.w.Register(r.ID) - s.node.Propose(ctx, data) + s.r.Propose(ctx, data) select { case x := <-ch: resp := x.(Response) @@ -526,7 +506,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } func (s *EtcdServer) LeaderStats() []byte { - lead := atomic.LoadUint64(&s.raftLead) + lead := atomic.LoadUint64(&s.r.lead) if lead != uint64(s.id) { return nil } @@ -571,14 +551,14 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { } // Implement the RaftTimer interface -func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) } +func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) } -func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) } +func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) } // Only for testing purpose // TODO: add Raft server interface to expose raft related info: // Index, Term, Lead, Committed, Applied, LastIndex, etc. -func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) } +func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) } func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } @@ -588,7 +568,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { cc.ID = s.reqIDGen.Next() ch := s.w.Register(cc.ID) - if err := s.node.ProposeConfChange(ctx, cc); err != nil { + if err := s.r.ProposeConfChange(ctx, cc); err != nil { s.w.Trigger(cc.ID, nil) return err } @@ -623,7 +603,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { // There is no promise that node has leader when do SYNC request, // so it uses goroutine to propose. go func() { - s.node.Propose(ctx, data) + s.r.Propose(ctx, data) cancel() }() } @@ -668,7 +648,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) { m.To = 0 } } - s.transport.Send(ms) + s.r.transport.Send(ms) } // apply takes entries received from Raft (after it has been committed) and @@ -693,8 +673,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint default: log.Panicf("entry type should be either EntryNormal or EntryConfChange") } - atomic.StoreUint64(&s.raftIndex, e.Index) - atomic.StoreUint64(&s.raftTerm, e.Term) + atomic.StoreUint64(&s.r.index, e.Index) + atomic.StoreUint64(&s.r.term, e.Term) applied = e.Index } return applied, shouldstop @@ -754,10 +734,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None - s.node.ApplyConfChange(cc) + s.r.ApplyConfChange(cc) return false, err } - *confState = *s.node.ApplyConfChange(cc) + *confState = *s.r.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode: m := new(Member) @@ -771,7 +751,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if m.ID == s.id { log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } else { - s.transport.AddPeer(m.ID, m.PeerURLs) + s.r.transport.AddPeer(m.ID, m.PeerURLs) log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } case raftpb.ConfChangeRemoveNode: @@ -780,7 +760,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if id == s.id { return true, nil } else { - s.transport.RemovePeer(id) + s.r.transport.RemovePeer(id) log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) } case raftpb.ConfChangeUpdateNode: @@ -795,7 +775,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if m.ID == s.id { log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } else { - s.transport.UpdatePeer(m.ID, m.PeerURLs) + s.r.transport.UpdatePeer(m.ID, m.PeerURLs) log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } } @@ -810,7 +790,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { if err != nil { log.Panicf("etcdserver: store save should never fail: %v", err) } - err = s.raftStorage.Compact(snapi, confState, d) + err = s.r.raftStorage.Compact(snapi, confState, d) if err != nil { // the snapshot was done asynchronously with the progress of raft. // raft might have already got a newer snapshot and called compact. @@ -821,78 +801,22 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { } log.Printf("etcdserver: compacted log at index %d", snapi) - if err := s.storage.Cut(); err != nil { + if err := s.r.storage.Cut(); err != nil { log.Panicf("etcdserver: rotate wal file should never fail: %v", err) } - snap, err := s.raftStorage.Snapshot() + snap, err := s.r.raftStorage.Snapshot() if err != nil { log.Panicf("etcdserver: snapshot error: %v", err) } - if err := s.storage.SaveSnap(snap); err != nil { + if err := s.r.storage.SaveSnap(snap); err != nil { log.Fatalf("etcdserver: save snapshot error: %v", err) } log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) } -// for testing -func (s *EtcdServer) PauseSending() { - p := s.transport.(rafthttp.Pausable) - p.Pause() -} +func (s *EtcdServer) PauseSending() { s.r.pauseSending() } -func (s *EtcdServer) ResumeSending() { - p := s.transport.(rafthttp.Pausable) - p.Resume() -} - -func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { - var err error - member := cfg.Cluster.MemberByName(cfg.Name) - metadata := pbutil.MustMarshal( - &pb.Metadata{ - NodeID: uint64(member.ID), - ClusterID: uint64(cfg.Cluster.ID()), - }, - ) - if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { - log.Fatalf("etcdserver create snapshot directory error: %v", err) - } - if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { - log.Fatalf("etcdserver: create wal error: %v", err) - } - peers := make([]raft.Peer, len(ids)) - for i, id := range ids { - ctx, err := json.Marshal((*cfg.Cluster).Member(id)) - if err != nil { - log.Panicf("marshal member should never fail: %v", err) - } - peers[i] = raft.Peer{ID: uint64(id), Context: ctx} - } - id = member.ID - log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) - s = raft.NewMemoryStorage() - n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s) - return -} - -func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { - var walsnap walpb.Snapshot - if snapshot != nil { - walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term - } - w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) - cfg.Cluster.SetID(cid) - - log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) - s := raft.NewMemoryStorage() - if snapshot != nil { - s.ApplySnapshot(*snapshot) - } - s.SetHardState(st) - s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s) - return id, n, s, w -} +func (s *EtcdServer) ResumeSending() { s.r.resumeSending() } // isBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index ab9b25366..87477ee20 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -456,7 +456,7 @@ func TestApplyConfChangeError(t *testing.T) { for i, tt := range tests { n := &nodeRecorder{} srv := &EtcdServer{ - node: n, + r: raftNode{Node: n}, Cluster: cl, } _, err := srv.applyConfChange(tt.cc, nil) @@ -483,10 +483,12 @@ func TestApplyConfChangeShouldStop(t *testing.T) { cl.AddMember(&Member{ID: types.ID(i)}) } srv := &EtcdServer{ - id: 1, - node: &nodeRecorder{}, - Cluster: cl, - transport: &nopTransporter{}, + id: 1, + r: raftNode{ + Node: &nodeRecorder{}, + transport: &nopTransporter{}, + }, + Cluster: cl, } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -522,12 +524,14 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { st := &storeRecorder{} srv := &EtcdServer{ - node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: newNodeCommitter(), + storage: &storageRecorder{}, + raftStorage: raft.NewMemoryStorage(), + transport: &nopTransporter{}, + }, + store: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.start() resp, err := srv.Do(context.Background(), tt) @@ -550,7 +554,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { wait := &waitRecorder{} srv := &EtcdServer{ - node: &nodeRecorder{}, + r: raftNode{Node: &nodeRecorder{}}, w: wait, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -569,7 +573,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ - node: &nodeRecorder{}, + r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -582,7 +586,7 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ - node: &nodeRecorder{}, + r: raftNode{Node: &nodeRecorder{}}, w: &waitRecorder{}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -598,7 +602,7 @@ func TestDoProposalStopped(t *testing.T) { func TestSync(t *testing.T) { n := &nodeRecorder{} srv := &EtcdServer{ - node: n, + r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } // check that sync is non-blocking @@ -631,7 +635,7 @@ func TestSync(t *testing.T) { func TestSyncTimeout(t *testing.T) { n := &nodeProposalBlockerRecorder{} srv := &EtcdServer{ - node: n, + r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } // check that sync is non-blocking @@ -656,13 +660,15 @@ func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) srv := &EtcdServer{ - node: n, - raftStorage: raft.NewMemoryStorage(), - store: &storeRecorder{}, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - SyncTicker: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + transport: &nopTransporter{}, + storage: &storageRecorder{}, + }, + store: &storeRecorder{}, + SyncTicker: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.start() defer srv.Stop() @@ -700,10 +706,12 @@ func TestSnapshot(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ - node: &nodeRecorder{}, - raftStorage: s, - store: st, - storage: p, + r: raftNode{ + Node: &nodeRecorder{}, + raftStorage: s, + storage: p, + }, + store: st, } srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}}) gaction := st.Action() @@ -731,13 +739,15 @@ func TestTriggerSnap(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ - node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), - store: st, - transport: &nopTransporter{}, - storage: p, - snapCount: uint64(snapc), - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: newNodeCommitter(), + snapCount: uint64(snapc), + raftStorage: raft.NewMemoryStorage(), + storage: p, + transport: &nopTransporter{}, + }, + store: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } srv.start() for i := 0; i < snapc+1; i++ { @@ -766,12 +776,14 @@ func TestRecvSnapshot(t *testing.T) { cl := newCluster("abc") cl.SetStore(store.New()) s := &EtcdServer{ - store: st, - transport: &nopTransporter{}, - storage: p, - node: n, - raftStorage: raft.NewMemoryStorage(), - Cluster: cl, + r: raftNode{ + Node: n, + transport: &nopTransporter{}, + storage: p, + raftStorage: raft.NewMemoryStorage(), + }, + store: st, + Cluster: cl, } s.start() @@ -799,12 +811,14 @@ func TestRecvSlowSnapshot(t *testing.T) { cl := newCluster("abc") cl.SetStore(store.New()) s := &EtcdServer{ - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - node: n, - raftStorage: raft.NewMemoryStorage(), - Cluster: cl, + r: raftNode{ + Node: n, + storage: &storageRecorder{}, + raftStorage: raft.NewMemoryStorage(), + transport: &nopTransporter{}, + }, + store: st, + Cluster: cl, } s.start() @@ -832,12 +846,14 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { cl.SetStore(store.New()) storage := raft.NewMemoryStorage() s := &EtcdServer{ - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - node: n, - raftStorage: storage, - Cluster: cl, + r: raftNode{ + Node: n, + storage: &storageRecorder{}, + raftStorage: storage, + transport: &nopTransporter{}, + }, + store: st, + Cluster: cl, } s.start() @@ -874,13 +890,15 @@ func TestAddMember(t *testing.T) { st := store.New() cl.SetStore(st) s := &EtcdServer{ - node: n, - raftStorage: raft.NewMemoryStorage(), - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - Cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: &storageRecorder{}, + transport: &nopTransporter{}, + }, + store: st, + Cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} @@ -911,13 +929,15 @@ func TestRemoveMember(t *testing.T) { cl.SetStore(store.New()) cl.AddMember(&Member{ID: 1234}) s := &EtcdServer{ - node: n, - raftStorage: raft.NewMemoryStorage(), - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - Cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: &storageRecorder{}, + transport: &nopTransporter{}, + }, + store: st, + Cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() err := s.RemoveMember(context.TODO(), 1234) @@ -947,13 +967,15 @@ func TestUpdateMember(t *testing.T) { cl.SetStore(st) cl.AddMember(&Member{ID: 1234}) s := &EtcdServer{ - node: n, - raftStorage: raft.NewMemoryStorage(), - store: st, - transport: &nopTransporter{}, - storage: &storageRecorder{}, - Cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: &storageRecorder{}, + transport: &nopTransporter{}, + }, + store: st, + Cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} @@ -983,9 +1005,9 @@ func TestPublish(t *testing.T) { w := &waitWithResponse{ch: ch} srv := &EtcdServer{ id: 1, + r: raftNode{Node: n}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, Cluster: &Cluster{}, - node: n, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -1022,13 +1044,15 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ - node: &nodeRecorder{}, - transport: &nopTransporter{}, - Cluster: &Cluster{}, - w: &waitRecorder{}, - done: make(chan struct{}), - stop: make(chan struct{}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), + r: raftNode{ + Node: &nodeRecorder{}, + transport: &nopTransporter{}, + }, + Cluster: &Cluster{}, + w: &waitRecorder{}, + done: make(chan struct{}), + stop: make(chan struct{}), + reqIDGen: idutil.NewGenerator(0, time.Time{}), } close(srv.done) srv.publish(time.Hour) @@ -1040,7 +1064,7 @@ func TestPublishRetry(t *testing.T) { defer log.SetOutput(os.Stderr) n := &nodeRecorder{} srv := &EtcdServer{ - node: n, + r: raftNode{Node: n}, w: &waitRecorder{}, done: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), diff --git a/integration/cluster_test.go b/integration/cluster_test.go index d5c2c4142..f4aaf19f2 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -484,6 +484,7 @@ func mustNewMember(t *testing.T, name string) *member { m.NewCluster = true m.Transport = mustNewTransport(t) m.ElectionTicks = electionTicks + m.TickMs = uint(tickDuration / time.Millisecond) return m } @@ -524,7 +525,6 @@ func (m *member) Launch() error { if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { return fmt.Errorf("failed to initialize the etcd server: %v", err) } - m.s.Ticker = time.Tick(tickDuration) m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start()