mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: gracefully stop etcdserver
This commit is contained in:
parent
4e251f8624
commit
8bf71d796e
@ -170,8 +170,9 @@ func Main() {
|
||||
}
|
||||
|
||||
shouldProxy := proxyFlag.String() != proxyFlagOff
|
||||
var stopped <-chan struct{}
|
||||
if !shouldProxy {
|
||||
err = startEtcd()
|
||||
stopped, err = startEtcd()
|
||||
if err == discovery.ErrFullCluster && fallbackFlag.String() == fallbackFlagProxy {
|
||||
log.Printf("etcd: discovery cluster full, falling back to %s", fallbackFlagProxy)
|
||||
shouldProxy = true
|
||||
@ -183,19 +184,18 @@ func Main() {
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
// Block indefinitely
|
||||
<-make(chan struct{})
|
||||
<-stopped
|
||||
}
|
||||
|
||||
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||
func startEtcd() error {
|
||||
func startEtcd() (<-chan struct{}, error) {
|
||||
apurls, err := flags.URLsFromFlags(fs, "initial-advertise-peer-urls", "addr", peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
cls, err := setupCluster(apurls)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
if *dir == "" {
|
||||
@ -203,25 +203,25 @@ func startEtcd() error {
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", *dir)
|
||||
}
|
||||
if err := os.MkdirAll(*dir, privateDirMode); err != nil {
|
||||
return fmt.Errorf("cannot create data directory: %v", err)
|
||||
return nil, fmt.Errorf("cannot create data directory: %v", err)
|
||||
}
|
||||
if err := fileutil.IsDirWriteable(*dir); err != nil {
|
||||
return fmt.Errorf("cannot write to data directory: %v", err)
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTransport(peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !peerTLSInfo.Empty() {
|
||||
@ -232,7 +232,7 @@ func startEtcd() error {
|
||||
var l net.Listener
|
||||
l, err = transport.NewListener(u.Host, u.Scheme, peerTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
urlStr := u.String()
|
||||
@ -248,7 +248,7 @@ func startEtcd() error {
|
||||
|
||||
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !clientTLSInfo.Empty() {
|
||||
@ -259,7 +259,7 @@ func startEtcd() error {
|
||||
var l net.Listener
|
||||
l, err = transport.NewListener(u.Host, u.Scheme, clientTLSInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
urlStr := u.String()
|
||||
@ -289,7 +289,7 @@ func startEtcd() error {
|
||||
var s *etcdserver.EtcdServer
|
||||
s, err = etcdserver.NewServer(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
s.Start()
|
||||
|
||||
@ -313,7 +313,7 @@ func startEtcd() error {
|
||||
log.Fatal(http.Serve(l, ch))
|
||||
}(l)
|
||||
}
|
||||
return nil
|
||||
return s.StopNotify(), nil
|
||||
}
|
||||
|
||||
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
|
||||
|
@ -37,23 +37,25 @@ const (
|
||||
)
|
||||
|
||||
type sendHub struct {
|
||||
tr *http.Transport
|
||||
cl ClusterInfo
|
||||
ss *stats.ServerStats
|
||||
ls *stats.LeaderStats
|
||||
senders map[types.ID]*sender
|
||||
tr http.RoundTripper
|
||||
cl ClusterInfo
|
||||
ss *stats.ServerStats
|
||||
ls *stats.LeaderStats
|
||||
senders map[types.ID]*sender
|
||||
shouldstop chan struct{}
|
||||
}
|
||||
|
||||
// newSendHub creates the default send hub used to transport raft messages
|
||||
// to other members. The returned sendHub will update the given ServerStats and
|
||||
// LeaderStats appropriately.
|
||||
func newSendHub(t *http.Transport, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
|
||||
func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
|
||||
h := &sendHub{
|
||||
tr: t,
|
||||
cl: cl,
|
||||
ss: ss,
|
||||
ls: ls,
|
||||
senders: make(map[types.ID]*sender),
|
||||
tr: t,
|
||||
cl: cl,
|
||||
ss: ss,
|
||||
ls: ls,
|
||||
senders: make(map[types.ID]*sender),
|
||||
shouldstop: make(chan struct{}, 1),
|
||||
}
|
||||
for _, m := range cl.Members() {
|
||||
h.Add(m)
|
||||
@ -94,6 +96,10 @@ func (h *sendHub) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *sendHub) ShouldStopNotify() <-chan struct{} {
|
||||
return h.shouldstop
|
||||
}
|
||||
|
||||
func (h *sendHub) Add(m *Member) {
|
||||
if _, ok := h.senders[m.ID]; ok {
|
||||
return
|
||||
@ -101,7 +107,7 @@ func (h *sendHub) Add(m *Member) {
|
||||
// TODO: considering how to switch between all available peer urls
|
||||
u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
|
||||
fs := h.ls.Follower(m.ID.String())
|
||||
s := newSender(h.tr, u, h.cl.ID(), fs)
|
||||
s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop)
|
||||
h.senders[m.ID] = s
|
||||
}
|
||||
|
||||
@ -128,22 +134,24 @@ func (h *sendHub) Update(m *Member) {
|
||||
}
|
||||
|
||||
type sender struct {
|
||||
tr http.RoundTripper
|
||||
u string
|
||||
cid types.ID
|
||||
fs *stats.FollowerStats
|
||||
q chan []byte
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
tr http.RoundTripper
|
||||
u string
|
||||
cid types.ID
|
||||
fs *stats.FollowerStats
|
||||
q chan []byte
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
shouldstop chan struct{}
|
||||
}
|
||||
|
||||
func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats) *sender {
|
||||
func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
|
||||
s := &sender{
|
||||
tr: tr,
|
||||
u: u,
|
||||
cid: cid,
|
||||
fs: fs,
|
||||
q: make(chan []byte),
|
||||
tr: tr,
|
||||
u: u,
|
||||
cid: cid,
|
||||
fs: fs,
|
||||
q: make(chan []byte),
|
||||
shouldstop: shouldstop,
|
||||
}
|
||||
s.wg.Add(connPerSender)
|
||||
for i := 0; i < connPerSender; i++ {
|
||||
@ -201,13 +209,19 @@ func (s *sender) post(data []byte) error {
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusPreconditionFailed:
|
||||
// TODO: shutdown the etcdserver gracefully?
|
||||
log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
|
||||
select {
|
||||
case s.shouldstop <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
|
||||
return nil
|
||||
case http.StatusForbidden:
|
||||
// TODO: stop the server
|
||||
log.Println("etcd: this member has been permanently removed from the cluster")
|
||||
log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
||||
select {
|
||||
case s.shouldstop <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
log.Println("etcdserver: this member has been permanently removed from the cluster")
|
||||
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
||||
return nil
|
||||
case http.StatusNoContent:
|
||||
return nil
|
||||
|
@ -89,12 +89,40 @@ func TestSendHubRemove(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendHubShouldStop(t *testing.T) {
|
||||
membs := []*Member{
|
||||
newTestMember(1, []string{"http://a"}, "", nil),
|
||||
}
|
||||
tr := newRespRoundTripper(http.StatusForbidden, nil)
|
||||
cl := newTestCluster(membs)
|
||||
ls := stats.NewLeaderStats("")
|
||||
h := newSendHub(tr, cl, nil, ls)
|
||||
// wait for handle goroutines start
|
||||
// TODO: wait for goroutines ready before return newSender
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
shouldstop := h.ShouldStopNotify()
|
||||
select {
|
||||
case <-shouldstop:
|
||||
t.Fatalf("received unexpected shouldstop notification")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
h.senders[1].send([]byte("somedata"))
|
||||
|
||||
testutil.ForceGosched()
|
||||
select {
|
||||
case <-shouldstop:
|
||||
default:
|
||||
t.Fatalf("cannot receive stop notification")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSenderSend tests that send func could post data using roundtripper
|
||||
// and increase success count in stats.
|
||||
func TestSenderSend(t *testing.T) {
|
||||
tr := &roundTripperRecorder{}
|
||||
fs := &stats.FollowerStats{}
|
||||
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs)
|
||||
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
|
||||
// wait for handle goroutines start
|
||||
// TODO: wait for goroutines ready before return newSender
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
@ -116,7 +144,7 @@ func TestSenderSend(t *testing.T) {
|
||||
func TestSenderExceedMaximalServing(t *testing.T) {
|
||||
tr := newRoundTripperBlocker()
|
||||
fs := &stats.FollowerStats{}
|
||||
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs)
|
||||
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
|
||||
// wait for handle goroutines start
|
||||
// TODO: wait for goroutines ready before return newSender
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
@ -144,7 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
|
||||
// it increases fail count in stats.
|
||||
func TestSenderSendFailed(t *testing.T) {
|
||||
fs := &stats.FollowerStats{}
|
||||
s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs)
|
||||
s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
|
||||
// wait for handle goroutines start
|
||||
// TODO: wait for goroutines ready before return newSender
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
@ -162,7 +190,7 @@ func TestSenderSendFailed(t *testing.T) {
|
||||
|
||||
func TestSenderPost(t *testing.T) {
|
||||
tr := &roundTripperRecorder{}
|
||||
s := newSender(tr, "http://10.0.0.1", types.ID(1), nil)
|
||||
s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
|
||||
if err := s.post([]byte("some data")); err != nil {
|
||||
t.Fatalf("unexpect post error: %v", err)
|
||||
}
|
||||
@ -204,7 +232,8 @@ func TestSenderPostBad(t *testing.T) {
|
||||
{"http://10.0.0.1", http.StatusCreated, nil},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil)
|
||||
shouldstop := make(chan struct{})
|
||||
s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
|
||||
err := s.post([]byte("some data"))
|
||||
s.stop()
|
||||
|
||||
@ -214,6 +243,28 @@ func TestSenderPostBad(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSenderPostShouldStop(t *testing.T) {
|
||||
tests := []struct {
|
||||
u string
|
||||
code int
|
||||
err error
|
||||
}{
|
||||
{"http://10.0.0.1", http.StatusForbidden, nil},
|
||||
{"http://10.0.0.1", http.StatusPreconditionFailed, nil},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
shouldstop := make(chan struct{}, 1)
|
||||
s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
|
||||
s.post([]byte("some data"))
|
||||
s.stop()
|
||||
select {
|
||||
case <-shouldstop:
|
||||
default:
|
||||
t.Fatalf("#%d: cannot receive shouldstop notification", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type roundTripperBlocker struct {
|
||||
c chan struct{}
|
||||
}
|
||||
|
@ -91,6 +91,7 @@ type Sender interface {
|
||||
Remove(id types.ID)
|
||||
Update(m *Member)
|
||||
Stop()
|
||||
ShouldStopNotify() <-chan struct{}
|
||||
}
|
||||
|
||||
type Storage interface {
|
||||
@ -327,6 +328,14 @@ func (s *EtcdServer) run() {
|
||||
// snapi indicates the index of the last submitted snapshot request
|
||||
var snapi, appliedi uint64
|
||||
var nodes []uint64
|
||||
var shouldstop bool
|
||||
shouldstopC := s.sender.ShouldStopNotify()
|
||||
|
||||
defer func() {
|
||||
s.node.Stop()
|
||||
s.sender.Stop()
|
||||
close(s.done)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-s.Ticker:
|
||||
@ -372,7 +381,9 @@ func (s *EtcdServer) run() {
|
||||
if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
|
||||
ents = rd.CommittedEntries[appliedi+1-firsti:]
|
||||
}
|
||||
appliedi = s.apply(ents)
|
||||
if appliedi, shouldstop = s.apply(ents); shouldstop {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.node.Advance()
|
||||
@ -386,10 +397,9 @@ func (s *EtcdServer) run() {
|
||||
}
|
||||
case <-syncC:
|
||||
s.sync(defaultSyncTimeout)
|
||||
case <-shouldstopC:
|
||||
return
|
||||
case <-s.stop:
|
||||
s.node.Stop()
|
||||
s.sender.Stop()
|
||||
close(s.done)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -612,7 +622,7 @@ func getExpirationTime(r *pb.Request) time.Time {
|
||||
|
||||
// apply takes an Entry received from Raft (after it has been committed) and
|
||||
// applies it to the current state of the EtcdServer
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
|
||||
func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
|
||||
var applied uint64
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
@ -624,7 +634,11 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
|
||||
case raftpb.EntryConfChange:
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
s.w.Trigger(cc.ID, s.applyConfChange(cc))
|
||||
shouldstop, err := s.applyConfChange(cc)
|
||||
s.w.Trigger(cc.ID, err)
|
||||
if shouldstop {
|
||||
return applied, true
|
||||
}
|
||||
default:
|
||||
log.Panicf("entry type should be either EntryNormal or EntryConfChange")
|
||||
}
|
||||
@ -632,7 +646,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
|
||||
atomic.StoreUint64(&s.raftTerm, e.Term)
|
||||
applied = e.Index
|
||||
}
|
||||
return applied
|
||||
return applied, false
|
||||
}
|
||||
|
||||
// applyRequest interprets r as a call to store.X and returns a Response interpreted
|
||||
@ -686,11 +700,11 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
|
||||
// applyConfChange applies a ConfChange to the server. It is only
|
||||
// invoked with a ConfChange that has already passed through Raft
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
|
||||
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.node.ApplyConfChange(cc)
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
s.node.ApplyConfChange(cc)
|
||||
switch cc.Type {
|
||||
@ -714,6 +728,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
|
||||
s.Cluster.RemoveMember(id)
|
||||
if id == s.id {
|
||||
log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID())
|
||||
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
||||
return true, nil
|
||||
} else {
|
||||
s.sender.Remove(id)
|
||||
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
|
||||
@ -734,7 +750,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
|
||||
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
|
@ -474,7 +474,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
||||
node: n,
|
||||
Cluster: cl,
|
||||
}
|
||||
err := srv.applyConfChange(tt.cc)
|
||||
_, err := srv.applyConfChange(tt.cc)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
@ -491,6 +491,42 @@ func TestApplyConfChangeError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyConfChangeShouldStop(t *testing.T) {
|
||||
cl := newCluster("")
|
||||
cl.SetStore(store.New())
|
||||
for i := 1; i <= 3; i++ {
|
||||
cl.AddMember(&Member{ID: types.ID(i)})
|
||||
}
|
||||
srv := &EtcdServer{
|
||||
id: 1,
|
||||
node: &nodeRecorder{},
|
||||
Cluster: cl,
|
||||
sender: &nopSender{},
|
||||
}
|
||||
cc := raftpb.ConfChange{
|
||||
Type: raftpb.ConfChangeRemoveNode,
|
||||
NodeID: 2,
|
||||
}
|
||||
// remove non-local member
|
||||
shouldStop, err := srv.applyConfChange(cc)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
if shouldStop != false {
|
||||
t.Errorf("shouldStop = %t, want %t", shouldStop, false)
|
||||
}
|
||||
|
||||
// remove local member
|
||||
cc.NodeID = 1
|
||||
shouldStop, err = srv.applyConfChange(cc)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
if shouldStop != true {
|
||||
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterOf1(t *testing.T) { testServer(t, 1) }
|
||||
func TestClusterOf3(t *testing.T) { testServer(t, 3) }
|
||||
|
||||
@ -503,10 +539,11 @@ func (s *fakeSender) Send(msgs []raftpb.Message) {
|
||||
s.ss[m.To-1].node.Step(context.TODO(), m)
|
||||
}
|
||||
}
|
||||
func (s *fakeSender) Add(m *Member) {}
|
||||
func (s *fakeSender) Update(m *Member) {}
|
||||
func (s *fakeSender) Remove(id types.ID) {}
|
||||
func (s *fakeSender) Stop() {}
|
||||
func (s *fakeSender) Add(m *Member) {}
|
||||
func (s *fakeSender) Update(m *Member) {}
|
||||
func (s *fakeSender) Remove(id types.ID) {}
|
||||
func (s *fakeSender) Stop() {}
|
||||
func (s *fakeSender) ShouldStopNotify() <-chan struct{} { return nil }
|
||||
|
||||
func testServer(t *testing.T, ns uint64) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -1556,11 +1593,12 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
||||
|
||||
type nopSender struct{}
|
||||
|
||||
func (s *nopSender) Send(m []raftpb.Message) {}
|
||||
func (s *nopSender) Add(m *Member) {}
|
||||
func (s *nopSender) Remove(id types.ID) {}
|
||||
func (s *nopSender) Update(m *Member) {}
|
||||
func (s *nopSender) Stop() {}
|
||||
func (s *nopSender) Send(m []raftpb.Message) {}
|
||||
func (s *nopSender) Add(m *Member) {}
|
||||
func (s *nopSender) Remove(id types.ID) {}
|
||||
func (s *nopSender) Update(m *Member) {}
|
||||
func (s *nopSender) Stop() {}
|
||||
func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil }
|
||||
|
||||
func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
|
||||
peers := make([]raft.Peer, len(ids))
|
||||
|
Loading…
x
Reference in New Issue
Block a user