diff --git a/etcd/etcd.go b/etcd/etcd.go index 86f413278..725fe16fa 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -41,6 +41,11 @@ const ( stop ) +var ( + removeTmpErr = fmt.Errorf("remove: try again") + serverStopErr = fmt.Errorf("server is stopped") +) + type Server struct { config *config.Config @@ -147,6 +152,10 @@ func (s *Server) Run() { } func (s *Server) Stop() { + if s.mode == stop { + return + } + s.mode = stop close(s.stop) s.t.stop() } @@ -194,16 +203,17 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { _, err := s.Get(p, false, false) if err == nil { - return fmt.Errorf("existed node") + return nil } if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound { return err } for { - if s.mode == stop { + select { + case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: + case <-s.stop: return fmt.Errorf("server is stopped") } - s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)} w, err := s.Watch(p, true, false, index+1) if err != nil { return err @@ -215,34 +225,45 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { } index = v.Index() case <-time.After(4 * defaultHeartbeat * s.tickDuration): + case <-s.stop: + return fmt.Errorf("server is stopped") } } } func (s *Server) Remove(id int64) error { p := path.Join(v2machineKVPrefix, fmt.Sprint(id)) - index := s.Index() - if _, err := s.Get(p, false, false); err != nil { - return err + v, err := s.Get(p, false, false) + if err != nil { + return nil } - for { - if s.mode == stop { - return fmt.Errorf("server is stopped") - } - s.removeNodeC <- raft.Config{NodeId: id} - w, err := s.Watch(p, true, false, index+1) - if err != nil { - return err - } - select { - case v := <-w.EventChan: - if v.Action == store.Delete { - return nil - } - index = v.Index() - case <-time.After(4 * defaultHeartbeat * s.tickDuration): + + select { + case s.removeNodeC <- raft.Config{NodeId: id}: + case <-s.stop: + return serverStopErr + } + + // TODO(xiangli): do not need to watch if the + // removal target is self + w, err := s.Watch(p, true, false, v.Index()+1) + if err != nil { + return removeTmpErr + } + + select { + case v := <-w.EventChan: + if v.Action == store.Delete { + return nil } + return removeTmpErr + case <-time.After(4 * defaultHeartbeat * s.tickDuration): + w.Remove() + return removeTmpErr + case <-s.stop: + w.Remove() + return serverStopErr } } @@ -291,15 +312,14 @@ func (s *Server) runParticipant() { node.Sync() case <-s.stop: log.Printf("Node: %d stopped\n", s.id) - s.mode = stop return } s.apply(node.Next()) s.send(node.Msgs()) if node.IsRemoved() { // TODO: delete it after standby is implemented - s.mode = stop log.Printf("Node: %d removed from participants\n", s.id) + s.Stop() return } } diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index b650f7f9c..8cfa9a63b 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -135,78 +135,61 @@ func TestAdd(t *testing.T) { for i := range hs { hs[len(hs)-i-1].Close() } - afterTest(t) } + afterTest(t) } func TestRemove(t *testing.T) { - tests := []struct { - size int - round int - }{ - {3, 5}, - {4, 5}, - {5, 5}, - {6, 5}, - } + tests := []int{3, 4, 5, 6} for _, tt := range tests { - es, hs := buildCluster(tt.size, false) + es, hs := buildCluster(tt, false) waitCluster(t, es) // we don't remove the machine from 2-node cluster because it is // not 100 percent safe in our raft. // TODO(yichengq): improve it later. - for i := 0; i < tt.size-2; i++ { + for i := 0; i < tt-2; i++ { id := int64(i) - var index uint64 + send := id for { - lead := es[id].node.Leader() - if lead != -1 { - index = es[lead].Index() - if err := es[lead].Remove(id); err == nil { - break - } + send++ + if send > int64(tt-1) { + send = id } - runtime.Gosched() - } - // i-th machine cannot be promised to apply the removal command of - // its own due to our non-optimized raft. - // TODO(yichengq): it should work when - // https://github.com/etcd-team/etcd/pull/7 is merged. - for j := i + 1; j < tt.size; j++ { - w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1) - if err != nil { - t.Errorf("#%d on %d: %v", i, j, err) + lead := es[send].node.Leader() + if lead == -1 { + time.Sleep(defaultElection * 5 * time.Millisecond) + continue + } + + err := es[lead].Remove(id) + if err == nil { break } - v := <-w.EventChan - ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i) - if v.Node.Key != ww { - t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww) + switch err { + case removeTmpErr: + time.Sleep(defaultElection * 5 * time.Millisecond) + case serverStopErr: + if lead == id { + break + } + default: + t.Fatal(err) } } - - // may need to wait for msgDenial - // TODO(yichengq): no need to sleep here when previous issue is merged. - if es[i].mode == stop { - continue - } - time.Sleep(defaultElection * defaultTickDuration) - if g := es[i].mode; g != stop { - t.Errorf("#%d: mode = %d, want stop", i, g) - } + <-es[i].stop } - for i := range hs { + for i := range es { es[len(hs)-i-1].Stop() } for i := range hs { hs[len(hs)-i-1].Close() } - afterTest(t) } + afterTest(t) } func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { diff --git a/etcd/v2_http_endpoint_test.go b/etcd/v2_http_endpoint_test.go index c61684c04..62db3645b 100644 --- a/etcd/v2_http_endpoint_test.go +++ b/etcd/v2_http_endpoint_test.go @@ -216,8 +216,8 @@ func TestPutAdminConfigEndPoint(t *testing.T) { for j := range hs { hs[len(hs)-j-1].Close() } - afterTest(t) } + afterTest(t) } func TestGetAdminMachineEndPoint(t *testing.T) {