From 83e1fe77c85b2f928bb6c41311ea69f32358d0b5 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 15 Jul 2014 11:55:58 -0700 Subject: [PATCH] server: refactor add --- etcd/etcd.go | 64 ++++++++++++++++++++++++++++------------------- etcd/etcd_test.go | 37 +++++++++++++++------------ 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 725fe16fa..3632b8f0f 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -42,7 +42,7 @@ const ( ) var ( - removeTmpErr = fmt.Errorf("remove: try again") + tmpErr = fmt.Errorf("try again") serverStopErr = fmt.Errorf("server is stopped") ) @@ -199,7 +199,6 @@ func (s *Server) Join() { func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { p := path.Join(v2machineKVPrefix, fmt.Sprint(id)) - index := s.Index() _, err := s.Get(p, false, false) if err == nil { @@ -208,26 +207,33 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound { return err } - for { - select { - case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: - case <-s.stop: - return fmt.Errorf("server is stopped") - } - w, err := s.Watch(p, true, false, index+1) - if err != nil { - return err - } - select { - case v := <-w.EventChan: - if v.Action == store.Set { - return nil - } - index = v.Index() - case <-time.After(4 * defaultHeartbeat * s.tickDuration): - case <-s.stop: - return fmt.Errorf("server is stopped") + + w, err := s.Watch(p, true, false, 0) + if err != nil { + log.Println("add error:", err) + return tmpErr + } + + select { + case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: + case <-s.stop: + return serverStopErr + } + + select { + case v := <-w.EventChan: + if v.Action == store.Set { + return nil } + log.Println("add error: action =", v.Action) + return tmpErr + case <-time.After(4 * defaultHeartbeat * s.tickDuration): + w.Remove() + log.Println("add error: wait timeout") + return tmpErr + case <-s.stop: + w.Remove() + return serverStopErr } } @@ -249,7 +255,8 @@ func (s *Server) Remove(id int64) error { // removal target is self w, err := s.Watch(p, true, false, v.Index()+1) if err != nil { - return removeTmpErr + log.Println("remove error:", err) + return tmpErr } select { @@ -257,10 +264,12 @@ func (s *Server) Remove(id int64) error { if v.Action == store.Delete { return nil } - return removeTmpErr + log.Println("remove error: action =", v.Action) + return tmpErr case <-time.After(4 * defaultHeartbeat * s.tickDuration): w.Remove() - return removeTmpErr + log.Println("remove error: wait timeout") + return tmpErr case <-s.stop: w.Remove() return serverStopErr @@ -284,18 +293,21 @@ func (s *Server) run() { func (s *Server) runParticipant() { node := s.node - addNodeC := s.addNodeC - removeNodeC := s.removeNodeC recv := s.t.recv ticker := time.NewTicker(s.tickDuration) v2SyncTicker := time.NewTicker(time.Millisecond * 500) var proposal chan v2Proposal + var addNodeC, removeNodeC chan raft.Config for { if node.HasLeader() { proposal = s.proposal + addNodeC = s.addNodeC + removeNodeC = s.removeNodeC } else { proposal = nil + addNodeC = nil + removeNodeC = nil } select { case p := <-proposal: diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 8cfa9a63b..d4332e6f3 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -5,7 +5,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "runtime" "testing" "time" @@ -101,31 +100,37 @@ func TestAdd(t *testing.T) { go es[0].Bootstrap() for i := 1; i < tt.size; i++ { - var index uint64 + id := int64(i) for { lead := es[0].node.Leader() - if lead != -1 { - index = es[lead].Index() - ne := es[i] - if err := es[lead].Add(ne.id, ne.raftPubAddr, ne.pubAddr); err == nil { - break - } + if lead == -1 { + time.Sleep(defaultElection * es[0].tickDuration) + continue + } + + err := es[lead].Add(id, es[id].raftPubAddr, es[id].pubAddr) + if err == nil { + break + } + switch err { + case tmpErr: + time.Sleep(defaultElection * es[0].tickDuration) + case serverStopErr: + t.Fatalf("#%d on %d: unexpected stop", i, lead) + default: + t.Fatal(err) } - runtime.Gosched() } go es[i].run() for j := 0; j <= i; j++ { - w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1) + p := fmt.Sprintf("%s/%d", v2machineKVPrefix, id) + w, err := es[j].Watch(p, false, false, 1) if err != nil { t.Errorf("#%d on %d: %v", i, j, err) break } - v := <-w.EventChan - ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i) - if v.Node.Key != ww { - t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww) - } + <-w.EventChan } } @@ -169,7 +174,7 @@ func TestRemove(t *testing.T) { break } switch err { - case removeTmpErr: + case tmpErr: time.Sleep(defaultElection * 5 * time.Millisecond) case serverStopErr: if lead == id {