mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
b17c1de30c
@ -744,21 +744,6 @@ func (s *EtcdServer) run() {
|
||||
// asynchronously accept toApply packets, dispatch progress in-order
|
||||
sched := schedule.NewFIFOScheduler(lg)
|
||||
|
||||
var (
|
||||
smu sync.RWMutex
|
||||
syncC <-chan time.Time
|
||||
)
|
||||
setSyncC := func(ch <-chan time.Time) {
|
||||
smu.Lock()
|
||||
syncC = ch
|
||||
smu.Unlock()
|
||||
}
|
||||
getSyncC := func() (ch <-chan time.Time) {
|
||||
smu.RLock()
|
||||
ch = syncC
|
||||
smu.RUnlock()
|
||||
return
|
||||
}
|
||||
rh := &raftReadyHandler{
|
||||
getLead: func() (lead uint64) { return s.getLead() },
|
||||
updateLead: func(lead uint64) { s.setLead(lead) },
|
||||
@ -770,7 +755,6 @@ func (s *EtcdServer) run() {
|
||||
if s.compactor != nil {
|
||||
s.compactor.Pause()
|
||||
}
|
||||
setSyncC(nil)
|
||||
} else {
|
||||
if newLeader {
|
||||
t := time.Now()
|
||||
@ -778,7 +762,6 @@ func (s *EtcdServer) run() {
|
||||
s.leadElectedTime = t
|
||||
s.leadTimeMu.Unlock()
|
||||
}
|
||||
setSyncC(s.SyncTicker.C)
|
||||
if s.compactor != nil {
|
||||
s.compactor.Resume()
|
||||
}
|
||||
@ -845,10 +828,6 @@ func (s *EtcdServer) run() {
|
||||
lg.Warn("server error", zap.Error(err))
|
||||
lg.Warn("data-dir used by this member must be removed")
|
||||
return
|
||||
case <-getSyncC():
|
||||
if s.v2store.HasTTLKeys() {
|
||||
s.sync(s.Cfg.ReqTimeout())
|
||||
}
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
@ -1689,25 +1668,6 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
|
||||
}
|
||||
}
|
||||
|
||||
// sync proposes a SYNC request and is non-blocking.
|
||||
// This makes no guarantee that the request will be proposed or performed.
|
||||
// The request will be canceled after the given timeout.
|
||||
func (s *EtcdServer) sync(timeout time.Duration) {
|
||||
req := pb.Request{
|
||||
Method: "SYNC",
|
||||
ID: s.reqIDGen.Next(),
|
||||
Time: time.Now().UnixNano(),
|
||||
}
|
||||
data := pbutil.MustMarshal(&req)
|
||||
// There is no promise that node has leader when do SYNC request,
|
||||
// so it uses goroutine to propose.
|
||||
ctx, cancel := context.WithTimeout(s.ctx, timeout)
|
||||
s.GoAttach(func() {
|
||||
s.r.Propose(ctx, data)
|
||||
cancel()
|
||||
})
|
||||
}
|
||||
|
||||
// publishV3 registers server information into the cluster using v3 request. The
|
||||
// information is the JSON representation of this server's member struct, updated
|
||||
// with the static clientURLs of the server.
|
||||
|
@ -478,143 +478,6 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
|
||||
func TestSync(t *testing.T) {
|
||||
n := newNodeRecorder()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||
|
||||
// check that sync is non-blocking
|
||||
done := make(chan struct{}, 1)
|
||||
go func() {
|
||||
srv.sync(10 * time.Second)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("sync should be non-blocking but did not return after 1s!")
|
||||
}
|
||||
|
||||
action, _ := n.Wait(1)
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0].Name != "Propose" {
|
||||
t.Fatalf("action = %s, want Propose", action[0].Name)
|
||||
}
|
||||
data := action[0].Params[0].([]byte)
|
||||
var r pb.Request
|
||||
if err := r.Unmarshal(data); err != nil {
|
||||
t.Fatalf("unmarshal request error: %v", err)
|
||||
}
|
||||
if r.Method != "SYNC" {
|
||||
t.Errorf("method = %s, want SYNC", r.Method)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
|
||||
// after timeout
|
||||
func TestSyncTimeout(t *testing.T) {
|
||||
n := newProposalBlockerRecorder()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||
|
||||
// check that sync is non-blocking
|
||||
done := make(chan struct{}, 1)
|
||||
go func() {
|
||||
srv.sync(0)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("sync should be non-blocking but did not return after 1s!")
|
||||
}
|
||||
|
||||
w := []testutil.Action{{Name: "Propose blocked"}}
|
||||
if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("action = %v, want %v", g, w)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: TestNoSyncWhenNoLeader
|
||||
|
||||
// TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
|
||||
func TestSyncTrigger(t *testing.T) {
|
||||
n := newReadyNode()
|
||||
st := make(chan time.Time, 1)
|
||||
tk := &time.Ticker{C: st}
|
||||
r := newRaftNode(raftNodeConfig{
|
||||
lg: zaptest.NewLogger(t),
|
||||
Node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: newNopTransporter(),
|
||||
storage: mockstorage.NewStorageRecorder(""),
|
||||
})
|
||||
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zaptest.NewLogger(t),
|
||||
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *r,
|
||||
v2store: mockstore.NewNop(),
|
||||
SyncTicker: tk,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
|
||||
// trigger the server to become a leader and accept sync requests
|
||||
go func() {
|
||||
srv.start()
|
||||
n.readyc <- raft.Ready{
|
||||
SoftState: &raft.SoftState{
|
||||
RaftState: raft.StateLeader,
|
||||
},
|
||||
}
|
||||
// trigger a sync request
|
||||
st <- time.Time{}
|
||||
}()
|
||||
|
||||
action, _ := n.Wait(1)
|
||||
go srv.Stop()
|
||||
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0].Name != "Propose" {
|
||||
t.Fatalf("action = %s, want Propose", action[0].Name)
|
||||
}
|
||||
data := action[0].Params[0].([]byte)
|
||||
var req pb.Request
|
||||
if err := req.Unmarshal(data); err != nil {
|
||||
t.Fatalf("error unmarshalling data: %v", err)
|
||||
}
|
||||
if req.Method != "SYNC" {
|
||||
t.Fatalf("unexpected proposed request: %#v", req.Method)
|
||||
}
|
||||
|
||||
// wait on stop message
|
||||
<-n.Chan()
|
||||
}
|
||||
|
||||
// TestSnapshot should snapshot the store and cut the persistent
|
||||
func TestSnapshot(t *testing.T) {
|
||||
revertFunc := verify.DisableVerifications()
|
||||
@ -1302,20 +1165,6 @@ func (n *nodeRecorder) ForgetLeader(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nodeProposalBlockerRecorder struct {
|
||||
nodeRecorder
|
||||
}
|
||||
|
||||
func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
|
||||
return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
|
||||
}
|
||||
|
||||
func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
|
||||
<-ctx.Done()
|
||||
n.Record(testutil.Action{Name: "Propose blocked"})
|
||||
return nil
|
||||
}
|
||||
|
||||
// readyNode is a nodeRecorder with a user-writeable ready channel
|
||||
type readyNode struct {
|
||||
nodeRecorder
|
||||
|
Loading…
x
Reference in New Issue
Block a user