diff --git a/etcd/etcd.go b/etcd/etcd.go index 7dbf6387d..e560860ff 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -10,6 +10,7 @@ import ( "time" "github.com/coreos/etcd/config" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/store" ) @@ -53,6 +54,7 @@ type Server struct { proposal chan v2Proposal node *v2Raft + addNodeC chan raft.Config removeNodeC chan raft.Config t *transporter @@ -91,6 +93,7 @@ func New(c *config.Config, id int64) *Server { Node: raft.New(id, defaultHeartbeat, defaultElection), result: make(map[wait]chan interface{}), }, + addNodeC: make(chan raft.Config), removeNodeC: make(chan raft.Config), t: newTransporter(tc), @@ -177,6 +180,37 @@ func (s *Server) Join() { s.run() } +func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { + p := path.Join(v2machineKVPrefix, fmt.Sprint(id)) + index := s.Index() + + _, err := s.Get(p, false, false) + if err == nil { + return fmt.Errorf("existed node") + } + if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound { + return err + } + for { + if s.mode == stop { + return fmt.Errorf("server is stopped") + } + s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)} + w, err := s.Watch(p, true, false, index+1) + if err != nil { + return err + } + select { + case v := <-w.EventChan: + if v.Action == store.Set { + return nil + } + index = v.Index() + case <-time.After(4 * defaultHeartbeat * s.tickDuration): + } + } +} + func (s *Server) Remove(id int64) error { p := path.Join(v2machineKVPrefix, fmt.Sprint(id)) index := s.Index() @@ -221,6 +255,7 @@ func (s *Server) run() { func (s *Server) runParticipant() { node := s.node + addNodeC := s.addNodeC removeNodeC := s.removeNodeC recv := s.t.recv ticker := time.NewTicker(s.tickDuration) @@ -236,6 +271,8 @@ func (s *Server) runParticipant() { select { case p := <-proposal: node.Propose(p) + case c := <-addNodeC: + node.UpdateConf(raft.AddNode, &c) case c := <-removeNodeC: node.UpdateConf(raft.RemoveNode, &c) case msg := <-recv: diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 5965c87e7..307047d99 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -76,6 +76,69 @@ func TestV2Redirect(t *testing.T) { afterTest(t) } +func TestAdd(t *testing.T) { + tests := []struct { + size int + round int + }{ + {3, 5}, + {4, 5}, + {5, 5}, + {6, 5}, + } + + for _, tt := range tests { + es := make([]*Server, tt.size) + hs := make([]*httptest.Server, tt.size) + for i := 0; i < tt.size; i++ { + c := config.New() + if i > 0 { + c.Peers = []string{hs[0].URL} + } + es[i], hs[i] = initTestServer(c, int64(i), false) + } + + go es[0].Bootstrap() + + for i := 1; i < tt.size; i++ { + var index uint64 + for { + lead := es[0].node.Leader() + if lead != -1 { + index = es[lead].Index() + ne := es[i] + if err := es[lead].Add(ne.id, ne.raftPubAddr, ne.pubAddr); err == nil { + break + } + } + runtime.Gosched() + } + go es[i].run() + + for j := 0; j <= i; j++ { + w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1) + if err != nil { + t.Errorf("#%d on %d: %v", i, j, err) + break + } + v := <-w.EventChan + ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i) + if v.Node.Key != ww { + t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww) + } + } + } + + for i := range hs { + es[len(hs)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } + afterTest(t) + } +} + func TestRemove(t *testing.T) { tests := []struct { size int @@ -155,21 +218,7 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { for i := range es { c := config.New() c.Peers = []string{seed} - es[i] = New(c, int64(i)) - es[i].SetTick(time.Millisecond * 5) - m := http.NewServeMux() - m.Handle("/", es[i]) - m.Handle("/raft", es[i].t) - m.Handle("/raft/", es[i].t) - - if tls { - hs[i] = httptest.NewTLSServer(m) - } else { - hs[i] = httptest.NewServer(m) - } - - es[i].raftPubAddr = hs[i].URL - es[i].pubAddr = hs[i].URL + es[i], hs[i] = initTestServer(c, int64(i), tls) if i == bootstrapper { seed = hs[i].URL @@ -188,6 +237,25 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { return es, hs } +func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptest.Server) { + e = New(c, id) + e.SetTick(time.Millisecond * 5) + m := http.NewServeMux() + m.Handle("/", e) + m.Handle("/raft", e.t) + m.Handle("/raft/", e.t) + + if tls { + h = httptest.NewTLSServer(m) + } else { + h = httptest.NewServer(m) + } + + e.raftPubAddr = h.URL + e.pubAddr = h.URL + return +} + func waitCluster(t *testing.T, es []*Server) { n := len(es) for i, e := range es {