From 98fdbaaae0f5aab694f0e48616bcaeb64260c703 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 11 Jul 2014 09:55:30 -0700 Subject: [PATCH] server: add remove function --- etcd/etcd.go | 32 +++++++++++++++++ etcd/etcd_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/etcd/etcd.go b/etcd/etcd.go index 29da4c1f3..14946b6c5 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -175,6 +175,23 @@ func (s *Server) Join() { s.run() } +func (s *Server) Remove(id int) { + d, err := json.Marshal(&raft.Config{NodeId: s.id}) + if err != nil { + panic(err) + } + + b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 2, Data: d}}}) + if err != nil { + panic(err) + } + + if err := s.t.send(s.raftPubAddr+raftPrefix, b); err != nil { + log.Println(err) + } + // todo(xiangli) WAIT for remove to be committed or retry... +} + func (s *Server) run() { for { switch s.mode { @@ -219,6 +236,12 @@ func (s *Server) runParticipant() { } s.apply(node.Next()) s.send(node.Msgs()) + if node.IsRemoved() { + // TODO: delete it after standby is implemented + s.mode = stop + log.Printf("Node: %d removed from participants\n", s.id) + return + } } } @@ -250,6 +273,15 @@ func (s *Server) apply(ents []raft.Entry) { s.nodes[cfg.Addr] = true p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent) + case raft.RemoveNode: + cfg := new(raft.Config) + if err := json.Unmarshal(ent.Data, cfg); err != nil { + log.Println(err) + break + } + log.Printf("Remove Node %x\n", cfg.NodeId) + p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) + s.Store.Delete(p, false, false) default: panic("unimplemented") } diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index f8917b58e..e27fac4e1 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "runtime" "testing" "time" @@ -75,6 +76,93 @@ func TestV2Redirect(t *testing.T) { afterTest(t) } +func TestRemove(t *testing.T) { + tests := []struct { + size int + round int + }{ + {3, 5}, + {4, 5}, + {5, 5}, + {6, 5}, + } + + for _, tt := range tests { + es, hs := buildCluster(tt.size, false) + waitCluster(t, es) + + // we don't remove the machine from 2-node cluster because it is + // not 100 percent safe in our raft. + // TODO(yichengq): improve it later. + for i := 0; i < tt.size-2; i++ { + // wait for leader to be stable for all live machines + // TODO(yichengq): change it later + var prevLead int64 + var prevTerm int64 + for j := i; j < tt.size; j++ { + id := int64(i) + lead := es[j].node.Leader() + term := es[j].node.Term() + fit := true + if j == i { + if lead < id { + fit = false + } + } else { + if lead != prevLead || term != prevTerm { + fit = false + } + } + if !fit { + j = i - 1 + runtime.Gosched() + continue + } + prevLead = lead + prevTerm = term + } + + index := es[i].Index() + es[i].Remove(i) + + // i-th machine cannot be promised to apply the removal command of + // its own due to our non-optimized raft. + // TODO(yichengq): it should work when + // https://github.com/etcd-team/etcd/pull/7 is merged. + for j := i + 1; j < tt.size; j++ { + w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1) + if err != nil { + t.Errorf("#%d on %d: %v", i, j, err) + break + } + v := <-w.EventChan + ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i) + if v.Node.Key != ww { + t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww) + } + } + + // may need to wait for msgDenial + // TODO(yichengq): no need to sleep here when previous issue is merged. + if es[i].mode == stop { + continue + } + time.Sleep(defaultElection * defaultTickDuration) + if g := es[i].mode; g != stop { + t.Errorf("#%d: mode = %d, want stop", i, g) + } + } + + for i := range hs { + es[len(hs)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } + afterTest(t) + } +} + func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { bootstrapper := 0 es := make([]*Server, number)