diff --git a/etcd/etcd.go b/etcd/etcd.go index f33dcf995..4a1fb0f38 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "net/http" + "net/url" "path" "time" @@ -44,8 +45,9 @@ const ( ) var ( - tmpErr = fmt.Errorf("try again") - serverStopErr = fmt.Errorf("server is stopped") + tmpErr = fmt.Errorf("try again") + raftStopErr = fmt.Errorf("raft is stopped") + noneId int64 = -1 ) type Server struct { @@ -56,21 +58,30 @@ type Server struct { id int64 pubAddr string raftPubAddr string - nodes map[string]bool tickDuration time.Duration + nodes map[string]bool + client *v2client + + // participant mode vars proposal chan v2Proposal node *v2Raft addNodeC chan raft.Config removeNodeC chan raft.Config t *transporter - client *v2client + + // standby mode vars + leader int64 + leaderAddr string + clusterConf *config.ClusterConfig store.Store - stop chan struct{} + modeC chan int + stop chan struct{} - http.Handler + participantHandler http.Handler + standbyHandler http.Handler } func New(c *config.Config, id int64) *Server { @@ -95,21 +106,20 @@ func New(c *config.Config, id int64) *Server { pubAddr: c.Addr, raftPubAddr: c.Peer.Addr, nodes: make(map[string]bool), + client: newClient(tc), tickDuration: defaultTickDuration, - proposal: make(chan v2Proposal, maxBufferedProposal), - node: &v2Raft{ - Node: raft.New(id, defaultHeartbeat, defaultElection), - result: make(map[wait]chan interface{}), - }, - addNodeC: make(chan raft.Config), - removeNodeC: make(chan raft.Config), - t: newTransporter(tc), - client: newClient(tc), Store: store.New(), - stop: make(chan struct{}), + modeC: make(chan int, 10), + stop: make(chan struct{}), } + node := &v2Raft{ + Node: raft.New(id, defaultHeartbeat, defaultElection), + result: make(map[wait]chan interface{}), + } + t := newTransporter(tc) + s.initParticipant(node, t) for _, seed := range c.Peers { s.nodes[seed] = true @@ -123,7 +133,10 @@ func New(c *config.Config, id int64) *Server { m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats)) m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig)) m.Handle(v2adminMachinesPrefix, handlerErr(s.serveAdminMachines)) - s.Handler = m + s.participantHandler = m + m = http.NewServeMux() + m.Handle("/", handlerErr(s.serveRedirect)) + s.standbyHandler = m return s } @@ -132,7 +145,7 @@ func (s *Server) SetTick(d time.Duration) { } func (s *Server) RaftHandler() http.Handler { - return s.t + return http.HandlerFunc(s.ServeHTTPRaft) } func (s *Server) ClusterConfig() *config.ClusterConfig { @@ -216,10 +229,15 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { return tmpErr } + if s.mode != participant { + return raftStopErr + } select { case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: - case <-s.stop: - return serverStopErr + default: + w.Remove() + log.Println("unable to send out addNode proposal") + return tmpErr } select { @@ -229,13 +247,10 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { } log.Println("add error: action =", v.Action) return tmpErr - case <-time.After(4 * defaultHeartbeat * s.tickDuration): + case <-time.After(6 * defaultHeartbeat * s.tickDuration): w.Remove() log.Println("add error: wait timeout") return tmpErr - case <-s.stop: - w.Remove() - return serverStopErr } } @@ -247,10 +262,14 @@ func (s *Server) Remove(id int64) error { return nil } + if s.mode != participant { + return raftStopErr + } select { case s.removeNodeC <- raft.Config{NodeId: id}: - case <-s.stop: - return serverStopErr + default: + log.Println("unable to send out removeNode proposal") + return tmpErr } // TODO(xiangli): do not need to watch if the @@ -268,18 +287,56 @@ func (s *Server) Remove(id int64) error { } log.Println("remove error: action =", v.Action) return tmpErr - case <-time.After(4 * defaultHeartbeat * s.tickDuration): + case <-time.After(6 * defaultHeartbeat * s.tickDuration): w.Remove() log.Println("remove error: wait timeout") return tmpErr - case <-s.stop: - w.Remove() - return serverStopErr } } +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch s.mode { + case participant: + s.participantHandler.ServeHTTP(w, r) + case standby: + s.standbyHandler.ServeHTTP(w, r) + case stop: + http.Error(w, "server is stopped", http.StatusInternalServerError) + } +} + +func (s *Server) ServeHTTPRaft(w http.ResponseWriter, r *http.Request) { + switch s.mode { + case participant: + s.t.ServeHTTP(w, r) + case standby: + http.NotFound(w, r) + case stop: + http.Error(w, "server is stopped", http.StatusInternalServerError) + } +} + +func (s *Server) initParticipant(node *v2Raft, t *transporter) { + s.proposal = make(chan v2Proposal, maxBufferedProposal) + s.node = node + s.addNodeC = make(chan raft.Config, 1) + s.removeNodeC = make(chan raft.Config, 1) + s.t = t +} + +func (s *Server) initStandby(leader int64, leaderAddr string, conf *config.ClusterConfig) { + s.leader = leader + s.leaderAddr = leaderAddr + s.clusterConf = conf +} + func (s *Server) run() { for { + select { + case s.modeC <- s.mode: + default: + } + switch s.mode { case participant: s.runParticipant() @@ -298,7 +355,7 @@ func (s *Server) runParticipant() { recv := s.t.recv ticker := time.NewTicker(s.tickDuration) v2SyncTicker := time.NewTicker(time.Millisecond * 500) - defer node.StopProposalWaiters() + defer s.node.StopProposalWaiters() var proposal chan v2Proposal var addNodeC, removeNodeC chan raft.Config @@ -332,16 +389,54 @@ func (s *Server) runParticipant() { s.apply(node.Next()) s.send(node.Msgs()) if node.IsRemoved() { - // TODO: delete it after standby is implemented - log.Printf("Node: %d removed from participants\n", s.id) - s.Stop() - return + break } } + + log.Printf("Node: %d removed to standby mode\n", s.id) + leader := noneId + leaderAddr := "" + if s.node.HasLeader() && !s.node.IsLeader() { + leader = s.node.Leader() + leaderAddr = s.fetchAddrFromStore(s.leader) + } + conf := s.ClusterConfig() + s.initStandby(leader, leaderAddr, conf) + s.mode = standby + return } func (s *Server) runStandby() { - panic("unimplemented") + syncDuration := time.Duration(int64(s.clusterConf.SyncInterval * float64(time.Second))) + for { + select { + case <-time.After(syncDuration): + case <-s.stop: + log.Printf("Node: %d stopped\n", s.id) + return + } + + if err := s.syncCluster(); err != nil { + continue + } + if err := s.standbyJoin(s.leaderAddr); err != nil { + continue + } + break + } + + log.Printf("Node: %d removed to participant mode\n", s.id) + // TODO(yichengq): use old v2Raft + // 1. reject proposal in leader state when sm is removed + // 2. record removeIndex in node to ignore msgDenial and old removal + s.Store = store.New() + node := &v2Raft{ + Node: raft.New(s.id, defaultHeartbeat, defaultElection), + result: make(map[wait]chan interface{}), + } + s.initParticipant(node, s.t) + s.mode = participant + return } func (s *Server) apply(ents []raft.Entry) { @@ -376,10 +471,9 @@ func (s *Server) apply(ents []raft.Entry) { break } log.Printf("Remove Node %x\n", cfg.NodeId) + delete(s.nodes, s.fetchAddrFromStore(cfg.NodeId)) p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) - if _, err := s.Store.Delete(p, false, false); err == nil { - delete(s.nodes, cfg.Addr) - } + s.Store.Delete(p, false, false) default: panic("unimplemented") } @@ -413,6 +507,17 @@ func (s *Server) send(msgs []raft.Message) { } } +func (s *Server) setClusterConfig(c *config.ClusterConfig) error { + b, err := json.Marshal(c) + if err != nil { + return err + } + if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil { + return err + } + return nil +} + func (s *Server) fetchAddr(nodeId int64) error { for seed := range s.nodes { if err := s.t.fetchAddr(seed, nodeId); err == nil { @@ -421,3 +526,29 @@ func (s *Server) fetchAddr(nodeId int64) error { } return fmt.Errorf("cannot fetch the address of node %d", nodeId) } + +func (s *Server) fetchAddrFromStore(nodeId int64) string { + p := path.Join(v2machineKVPrefix, fmt.Sprint(nodeId)) + if ev, err := s.Get(p, false, false); err == nil { + if m, err := url.ParseQuery(*ev.Node.Value); err == nil { + return m["raft"][0] + } + } + return "" +} + +func (s *Server) standbyJoin(addr string) error { + if s.clusterConf.ActiveSize <= len(s.nodes) { + return fmt.Errorf("full cluster") + } + info := &context{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + ClientURL: s.pubAddr, + PeerURL: s.raftPubAddr, + } + if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil { + return err + } + return nil +} diff --git a/etcd/etcd_functional_test.go b/etcd/etcd_functional_test.go index b50f6702e..506f29f8f 100644 --- a/etcd/etcd_functional_test.go +++ b/etcd/etcd_functional_test.go @@ -104,7 +104,17 @@ type leadterm struct { term int64 } -func waitLeader(es []*Server) { +func waitActiveLeader(es []*Server) (lead, term int64) { + for { + if l, t := waitLeader(es); l >= 0 && es[l].mode == participant { + return l, t + } + } +} + +// waitLeader waits until all alive servers are checked to have the same leader. +// WARNING: The lead returned is not guaranteed to be actual leader. +func waitLeader(es []*Server) (lead, term int64) { for { ls := make([]leadterm, 0, len(es)) for i := range es { @@ -117,7 +127,7 @@ func waitLeader(es []*Server) { } } if isSameLead(ls) { - return + return ls[0].lead, ls[0].term } time.Sleep(es[0].tickDuration * defaultElection) } diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 70841b0d4..a0ea6c121 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -2,6 +2,7 @@ package etcd import ( "fmt" + "math/rand" "net/http" "net/http/httptest" "net/url" @@ -9,6 +10,7 @@ import ( "time" "github.com/coreos/etcd/config" + "github.com/coreos/etcd/store" ) func TestMultipleNodes(t *testing.T) { @@ -107,7 +109,7 @@ func TestAdd(t *testing.T) { switch err { case tmpErr: time.Sleep(defaultElection * es[0].tickDuration) - case serverStopErr: + case raftStopErr: t.Fatalf("#%d on %d: unexpected stop", i, lead) default: t.Fatal(err) @@ -147,6 +149,8 @@ func TestRemove(t *testing.T) { // not 100 percent safe in our raft. // TODO(yichengq): improve it later. for i := 0; i < tt-2; i++ { + <-es[i].modeC + id := int64(i) send := id for { @@ -168,15 +172,19 @@ func TestRemove(t *testing.T) { switch err { case tmpErr: time.Sleep(defaultElection * 5 * time.Millisecond) - case serverStopErr: + case raftStopErr: if lead == id { break } default: t.Fatal(err) } + + } + + if g := <-es[i].modeC; g != standby { + t.Errorf("#%d: mode = %d, want standby", i, g) } - <-es[i].stop } for i := range es { @@ -189,6 +197,79 @@ func TestRemove(t *testing.T) { afterTest(t) } +// TODO(yichengq): cannot handle previous msgDenial correctly now +func TestModeSwitch(t *testing.T) { + size := 5 + round := 1 + + for i := 0; i < size; i++ { + es, hs := buildCluster(size, false) + waitCluster(t, es) + + if g := <-es[i].modeC; g != participant { + t.Fatalf("#%d: mode = %d, want participant", i, g) + } + + config := config.NewClusterConfig() + config.SyncInterval = 0 + id := int64(i) + for j := 0; j < round; j++ { + lead, _ := waitActiveLeader(es) + // cluster only demotes follower + if lead == id { + continue + } + + config.ActiveSize = size - 1 + if err := es[lead].setClusterConfig(config); err != nil { + t.Fatalf("#%d: setClusterConfig err = %v", i, err) + } + if err := es[lead].Remove(id); err != nil { + t.Fatalf("#%d: remove err = %v", i, err) + } + + if g := <-es[i].modeC; g != standby { + t.Fatalf("#%d: mode = %d, want standby", i, g) + } + if g := len(es[i].modeC); g != 0 { + t.Fatalf("#%d: mode to %d, want remain", i, <-es[i].modeC) + } + + if g := es[i].leader; g != lead { + t.Errorf("#%d: lead = %d, want %d", i, g, lead) + } + + config.ActiveSize = size + if err := es[lead].setClusterConfig(config); err != nil { + t.Fatalf("#%d: setClusterConfig err = %v", i, err) + } + + if g := <-es[i].modeC; g != participant { + t.Fatalf("#%d: mode = %d, want participant", i, g) + } + // if g := len(es[i].modeC); g != 0 { + // t.Fatalf("#%d: mode to %d, want remain", i, <-es[i].modeC) + // } + + // if err := checkParticipant(i, es); err != nil { + // t.Errorf("#%d: check alive err = %v", i, err) + // } + } + + // if g := len(es[i].modeC); g != 0 { + // t.Fatalf("#%d: mode to %d, want remain", i, <-es[i].modeC) + // } + + for i := range hs { + es[len(hs)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } + } + afterTest(t) +} + func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { bootstrapper := 0 es := make([]*Server, number) @@ -197,7 +278,9 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { for i := range es { c := config.New() - c.Peers = []string{seed} + if seed != "" { + c.Peers = []string{seed} + } es[i], hs[i] = initTestServer(c, int64(i), tls) if i == bootstrapper { @@ -262,3 +345,24 @@ func waitCluster(t *testing.T, es []*Server) { } } } + +// checkParticipant checks the i-th server works well as participant. +func checkParticipant(i int, es []*Server) error { + lead, _ := waitActiveLeader(es) + key := fmt.Sprintf("/%d", rand.Int31()) + ev, err := es[lead].Set(key, false, "bar", store.Permanent) + if err != nil { + return err + } + + w, err := es[i].Watch(key, false, false, ev.Index()) + if err != nil { + return err + } + select { + case <-w.EventChan: + case <-time.After(8 * defaultHeartbeat * es[i].tickDuration): + return fmt.Errorf("watch timeout") + } + return nil +} diff --git a/etcd/v2_admin.go b/etcd/v2_admin.go index 293d3d036..7be3c08f9 100644 --- a/etcd/v2_admin.go +++ b/etcd/v2_admin.go @@ -45,11 +45,7 @@ func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error return err } c.Sanitize() - b, err := json.Marshal(c) - if err != nil { - return err - } - if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil { + if err := s.setClusterConfig(c); err != nil { return err } default: diff --git a/etcd/v2_http.go b/etcd/v2_http.go index 248c66e9c..61a12ff04 100644 --- a/etcd/v2_http.go +++ b/etcd/v2_http.go @@ -111,16 +111,24 @@ func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) erro return fmt.Errorf("failed to parse node entry: %s", *e.Node.Value) } - originalURL := r.URL - redirectURL, err := url.Parse(m["etcd"][0]) + redirectAddr, err := s.buildRedirectURL(m["etcd"][0], r.URL) if err != nil { - log.Println("redirect cannot parse url:", err) - return fmt.Errorf("redirect cannot parse url: %v", err) + log.Println("redirect cannot build new url:", err) + return err + } + + http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect) + return nil +} + +func (s *Server) buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) { + redirectURL, err := url.Parse(redirectAddr) + if err != nil { + return "", fmt.Errorf("redirect cannot parse url: %v", err) } redirectURL.Path = originalURL.Path redirectURL.RawQuery = originalURL.RawQuery redirectURL.Fragment = originalURL.Fragment - http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) - return nil + return redirectURL.String(), nil } diff --git a/etcd/v2_raft.go b/etcd/v2_raft.go index cf35252cf..3a70af77f 100644 --- a/etcd/v2_raft.go +++ b/etcd/v2_raft.go @@ -48,7 +48,7 @@ func (r *v2Raft) Sync() { func (r *v2Raft) StopProposalWaiters() { for k, ch := range r.result { - ch <- fmt.Errorf("server is stopped or removed from participant") + ch <- raftStopErr delete(r.result, k) } } diff --git a/etcd/v2_standby.go b/etcd/v2_standby.go new file mode 100644 index 000000000..95a462d10 --- /dev/null +++ b/etcd/v2_standby.go @@ -0,0 +1,47 @@ +package etcd + +import ( + "fmt" + "net/http" + "strconv" +) + +func (s *Server) serveRedirect(w http.ResponseWriter, r *http.Request) error { + if s.leader == noneId { + return fmt.Errorf("no leader in the cluster") + } + redirectAddr, err := s.buildRedirectURL(s.leaderAddr, r.URL) + if err != nil { + return err + } + http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect) + return nil +} + +func (s *Server) syncCluster() error { + for node := range s.nodes { + machines, err := s.client.GetMachines(node) + if err != nil { + continue + } + config, err := s.client.GetClusterConfig(node) + if err != nil { + continue + } + s.nodes = make(map[string]bool) + for _, machine := range machines { + s.nodes[machine.PeerURL] = true + if machine.State == stateLeader { + id, err := strconv.ParseInt(machine.Name, 0, 64) + if err != nil { + return err + } + s.leader = id + s.leaderAddr = machine.PeerURL + } + } + s.clusterConf = config + return nil + } + return fmt.Errorf("unreachable cluster") +} diff --git a/etcd/v2_store.go b/etcd/v2_store.go index 31abc9280..1c044daac 100644 --- a/etcd/v2_store.go +++ b/etcd/v2_store.go @@ -61,6 +61,9 @@ func (s *Server) do(c *cmd) (*store.Event, error) { ret: make(chan interface{}, 1), } + if s.mode != participant { + return nil, raftStopErr + } select { case s.proposal <- p: default: