From 6d81aabd48974e8bd281110942a800365438944e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 18 Jul 2014 01:36:58 -0700 Subject: [PATCH] server: refactor server --- etcd/etcd.go | 515 ++++++---------------------------- etcd/etcd_functional_test.go | 20 +- etcd/etcd_test.go | 65 ++--- etcd/participant.go | 328 ++++++++++++++++++++++ etcd/peer.go | 28 +- etcd/standby.go | 137 +++++++++ etcd/v2_admin.go | 52 ++-- etcd/v2_apply.go | 32 +-- etcd/v2_http.go | 32 +-- etcd/v2_http_delete.go | 26 +- etcd/v2_http_endpoint_test.go | 8 +- etcd/v2_http_get.go | 24 +- etcd/v2_http_post.go | 12 +- etcd/v2_http_put.go | 52 ++-- etcd/v2_standby.go | 47 ---- etcd/v2_store.go | 35 ++- 16 files changed, 744 insertions(+), 669 deletions(-) create mode 100644 etcd/participant.go create mode 100644 etcd/standby.go delete mode 100644 etcd/v2_standby.go diff --git a/etcd/etcd.go b/etcd/etcd.go index e1262f3d0..14f543b59 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -2,88 +2,43 @@ package etcd import ( "crypto/tls" - "encoding/json" - "fmt" + "errors" "log" "net/http" - "net/url" - "path" "time" "github.com/coreos/etcd/config" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/store" ) const ( - defaultHeartbeat = 1 - defaultElection = 5 - - maxBufferedProposal = 128 - - defaultTickDuration = time.Millisecond * 100 - - v2machineKVPrefix = "/_etcd/machines" - v2configKVPrefix = "/_etcd/config" - - v2Prefix = "/v2/keys" - v2machinePrefix = "/v2/machines" - v2peersPrefix = "/v2/peers" - v2LeaderPrefix = "/v2/leader" - v2StoreStatsPrefix = "/v2/stats/store" - v2adminConfigPrefix = "/v2/admin/config" - v2adminMachinesPrefix = "/v2/admin/machines/" - raftPrefix = "/raft" -) -const ( - participant = iota - standby - stop + participantMode int64 = iota + standbyMode + stopMode ) var ( - tmpErr = fmt.Errorf("try again") - raftStopErr = fmt.Errorf("raft is stopped") - noneId int64 = -1 + stopErr = errors.New("stopped") ) type Server struct { - config *config.Config - - mode int - - id int64 - pubAddr string - raftPubAddr string - - nodes map[string]bool - peerHub *peerHub - + config *config.Config + id int64 + pubAddr string + raftPubAddr string tickDuration time.Duration - client *v2client - rh *raftHandler + mode atomicInt + nodes map[string]bool + p *participant + s *standby - // participant mode vars - proposal chan v2Proposal - addNodeC chan raft.Config - removeNodeC chan raft.Config - node *v2Raft - store.Store + client *v2client + peerHub *peerHub - // standby mode vars - leader int64 - leaderAddr string - clusterConf *config.ClusterConfig - - modeC chan int - stop chan struct{} - - participantHandler http.Handler - standbyHandler http.Handler + modeC chan int64 + stopc chan struct{} } func New(c *config.Config, id int64) *Server { @@ -105,406 +60,112 @@ func New(c *config.Config, id int64) *Server { tr := new(http.Transport) tr.TLSClientConfig = tc client := &http.Client{Transport: tr} - peerHub := newPeerHub(c.Peers, client) s := &Server{ - config: c, - id: id, - pubAddr: c.Addr, - raftPubAddr: c.Peer.Addr, - - nodes: make(map[string]bool), - - peerHub: peerHub, - + config: c, + id: id, + pubAddr: c.Addr, + raftPubAddr: c.Peer.Addr, tickDuration: defaultTickDuration, - client: newClient(tc), - rh: newRaftHandler(peerHub), + mode: atomicInt(stopMode), + nodes: make(map[string]bool), - node: &v2Raft{ - Node: raft.New(id, defaultHeartbeat, defaultElection), - result: make(map[wait]chan interface{}), - }, - Store: store.New(), + client: newClient(tc), + peerHub: newPeerHub(c.Peers, client), - modeC: make(chan int, 10), - stop: make(chan struct{}), + modeC: make(chan int64, 10), + stopc: make(chan struct{}), } - for _, seed := range c.Peers { s.nodes[seed] = true } - m := http.NewServeMux() - m.Handle(v2Prefix+"/", handlerErr(s.serveValue)) - m.Handle(v2machinePrefix, handlerErr(s.serveMachines)) - m.Handle(v2peersPrefix, handlerErr(s.serveMachines)) - m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader)) - m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats)) - m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig)) - m.Handle(v2adminMachinesPrefix, handlerErr(s.serveAdminMachines)) - s.participantHandler = m - m = http.NewServeMux() - m.Handle("/", handlerErr(s.serveRedirect)) - s.standbyHandler = m return s } -func (s *Server) SetTick(d time.Duration) { - s.tickDuration = d -} - -func (s *Server) RaftHandler() http.Handler { - return s.rh -} - -func (s *Server) Run() { - if len(s.config.Peers) == 0 { - s.Bootstrap() - } else { - s.Join() - } +func (s *Server) SetTick(tick time.Duration) { + s.tickDuration = tick } +// Stop stops the server elegently. func (s *Server) Stop() { - if s.mode == stop { + if s.mode.Get() == stopMode { return } - s.mode = stop - - s.rh.stop() - s.client.CloseConnections() - s.peerHub.stop() - close(s.stop) -} - -func (s *Server) Bootstrap() { - log.Println("starting a bootstrap node") - s.initParticipant() - s.node.Campaign() - s.node.Add(s.id, s.raftPubAddr, []byte(s.pubAddr)) - s.apply(s.node.Next()) - s.run() -} - -func (s *Server) Join() { - log.Println("joining cluster via peers", s.config.Peers) - s.initParticipant() - info := &context{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - ClientURL: s.pubAddr, - PeerURL: s.raftPubAddr, - } - - url := "" - for i := 0; i < 5; i++ { - for seed := range s.nodes { - if err := s.client.AddMachine(seed, fmt.Sprint(s.id), info); err == nil { - url = seed - break - } else { - log.Println(err) - } - } - if url != "" { - break - } - time.Sleep(100 * time.Millisecond) - } - s.nodes = map[string]bool{url: true} - - s.run() -} - -func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error { - p := path.Join(v2machineKVPrefix, fmt.Sprint(id)) - - _, err := s.Get(p, false, false) - if err == nil { - return nil - } - if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound { - return err - } - - w, err := s.Watch(p, true, false, 0) - if err != nil { - log.Println("add error:", err) - return tmpErr - } - - if s.mode != participant { - return raftStopErr - } - select { - case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: - default: - w.Remove() - log.Println("unable to send out addNode proposal") - return tmpErr - } - - select { - case v := <-w.EventChan: - if v.Action == store.Set { - return nil - } - log.Println("add error: action =", v.Action) - return tmpErr - case <-time.After(6 * defaultHeartbeat * s.tickDuration): - w.Remove() - log.Println("add error: wait timeout") - return tmpErr - } -} - -func (s *Server) Remove(id int64) error { - p := path.Join(v2machineKVPrefix, fmt.Sprint(id)) - - v, err := s.Get(p, false, false) - if err != nil { - return nil - } - - if s.mode != participant { - return raftStopErr - } - select { - case s.removeNodeC <- raft.Config{NodeId: id}: - default: - log.Println("unable to send out removeNode proposal") - return tmpErr - } - - // TODO(xiangli): do not need to watch if the - // removal target is self - w, err := s.Watch(p, true, false, v.Index()+1) - if err != nil { - log.Println("remove error:", err) - return tmpErr - } - - select { - case v := <-w.EventChan: - if v.Action == store.Delete { - return nil - } - log.Println("remove error: action =", v.Action) - return tmpErr - case <-time.After(6 * defaultHeartbeat * s.tickDuration): - w.Remove() - log.Println("remove error: wait timeout") - return tmpErr - } + s.stopc <- struct{}{} + <-s.stopc } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - switch s.mode { - case participant: - s.participantHandler.ServeHTTP(w, r) - case standby: - s.standbyHandler.ServeHTTP(w, r) - case stop: - http.Error(w, "server is stopped", http.StatusInternalServerError) + switch s.mode.Get() { + case participantMode: + s.p.ServeHTTP(w, r) + case standbyMode: + s.s.ServeHTTP(w, r) + case stopMode: + http.NotFound(w, r) } } -func (s *Server) initParticipant() { - s.proposal = make(chan v2Proposal, maxBufferedProposal) - s.addNodeC = make(chan raft.Config, 1) - s.removeNodeC = make(chan raft.Config, 1) - s.rh.start() - s.mode = participant +func (s *Server) RaftHandler() http.Handler { + return http.HandlerFunc(s.ServeRaftHTTP) } -func (s *Server) initStandby() { - s.leader = noneId - s.leaderAddr = "" - s.clusterConf = config.NewClusterConfig() - s.mode = standby +func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) { + switch s.mode.Get() { + case participantMode: + s.p.raftHandler().ServeHTTP(w, r) + case standbyMode: + http.NotFound(w, r) + case stopMode: + http.NotFound(w, r) + } } -func (s *Server) run() { +func (s *Server) Run() { + runc := make(chan struct{}) + next := participantMode for { - select { - case s.modeC <- s.mode: - default: - } - switch s.mode { - case participant: - s.runParticipant() - case standby: - s.runStandby() - case stop: - return + switch next { + case participantMode: + s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration) + s.mode.Set(participantMode) + // TODO: it may block here. remove modeC later. + s.modeC <- s.mode.Get() + next = standbyMode + go func() { + s.p.run() + runc <- struct{}{} + }() + case standbyMode: + s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub) + s.mode.Set(standbyMode) + s.modeC <- s.mode.Get() + next = participantMode + go func() { + s.s.run() + runc <- struct{}{} + }() default: panic("unsupport mode") } - } -} - -func (s *Server) runParticipant() { - defer func() { - s.node.StopProposalWaiters() - s.rh.stop() - }() - node := s.node - recv := s.rh.recv - ticker := time.NewTicker(s.tickDuration) - v2SyncTicker := time.NewTicker(time.Millisecond * 500) - - var proposal chan v2Proposal - var addNodeC, removeNodeC chan raft.Config - for { - if node.HasLeader() { - proposal = s.proposal - addNodeC = s.addNodeC - removeNodeC = s.removeNodeC - } else { - proposal = nil - addNodeC = nil - removeNodeC = nil - } select { - case p := <-proposal: - node.Propose(p) - case c := <-addNodeC: - node.UpdateConf(raft.AddNode, &c) - case c := <-removeNodeC: - node.UpdateConf(raft.RemoveNode, &c) - case msg := <-recv: - node.Step(*msg) - case <-ticker.C: - node.Tick() - case <-v2SyncTicker.C: - node.Sync() - case <-s.stop: - log.Printf("Node: %d stopped\n", s.id) - return - } - s.apply(node.Next()) - s.send(node.Msgs()) - if node.IsRemoved() { - log.Printf("Node: %d removed to standby mode\n", s.id) - s.initStandby() + case <-runc: + case <-s.stopc: + switch s.mode.Get() { + case participantMode: + s.p.stop() + case standbyMode: + s.s.stop() + } + <-runc + s.mode.Set(stopMode) + s.modeC <- s.mode.Get() + s.client.CloseConnections() + s.peerHub.stop() + s.stopc <- struct{}{} return } } } - -func (s *Server) runStandby() { - var syncDuration time.Duration - for { - select { - case <-time.After(syncDuration): - case <-s.stop: - log.Printf("Node: %d stopped\n", s.id) - return - } - - if err := s.syncCluster(); err != nil { - log.Println("standby sync:", err) - continue - } - syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second)) - if s.clusterConf.ActiveSize <= len(s.nodes) { - continue - } - if err := s.joinByPeer(s.leaderAddr); err != nil { - log.Println("standby join:", err) - continue - } - log.Printf("Node: %d removed to participant mode\n", s.id) - // TODO(yichengq): use old v2Raft - // 1. reject proposal in leader state when sm is removed - // 2. record removeIndex in node to ignore msgDenial and old removal - s.node = &v2Raft{ - Node: raft.New(s.id, defaultHeartbeat, defaultElection), - result: make(map[wait]chan interface{}), - } - s.Store = store.New() - s.initParticipant() - return - } -} - -func (s *Server) apply(ents []raft.Entry) { - offset := s.node.Applied() - int64(len(ents)) + 1 - for i, ent := range ents { - switch ent.Type { - // expose raft entry type - case raft.Normal: - if len(ent.Data) == 0 { - continue - } - s.v2apply(offset+int64(i), ent) - case raft.AddNode: - cfg := new(raft.Config) - if err := json.Unmarshal(ent.Data, cfg); err != nil { - log.Println(err) - break - } - peer, err := s.peerHub.add(cfg.NodeId, cfg.Addr) - if err != nil { - log.Println(err) - break - } - peer.participate() - log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context)) - p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) - if _, err := s.Store.Set(p, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent); err == nil { - s.nodes[cfg.Addr] = true - } - case raft.RemoveNode: - cfg := new(raft.Config) - if err := json.Unmarshal(ent.Data, cfg); err != nil { - log.Println(err) - break - } - log.Printf("Remove Node %x\n", cfg.NodeId) - delete(s.nodes, s.fetchAddrFromStore(cfg.NodeId)) - peer, err := s.peerHub.peer(cfg.NodeId) - if err != nil { - log.Fatal("cannot get the added peer:", err) - } - peer.idle() - p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) - s.Store.Delete(p, false, false) - default: - panic("unimplemented") - } - } -} - -func (s *Server) send(msgs []raft.Message) { - for i := range msgs { - if err := s.peerHub.send(msgs[i]); err != nil { - log.Println("send:", err) - } - } -} - -func (s *Server) fetchAddrFromStore(nodeId int64) string { - p := path.Join(v2machineKVPrefix, fmt.Sprint(nodeId)) - if ev, err := s.Get(p, false, false); err == nil { - if m, err := url.ParseQuery(*ev.Node.Value); err == nil { - return m["raft"][0] - } - } - return "" -} - -func (s *Server) joinByPeer(addr string) error { - info := &context{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - ClientURL: s.pubAddr, - PeerURL: s.raftPubAddr, - } - if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil { - return err - } - return nil -} diff --git a/etcd/etcd_functional_test.go b/etcd/etcd_functional_test.go index 506f29f8f..b79e54d68 100644 --- a/etcd/etcd_functional_test.go +++ b/etcd/etcd_functional_test.go @@ -17,14 +17,14 @@ func TestKillLeader(t *testing.T) { waitCluster(t, es) waitLeader(es) - lead := es[0].node.Leader() + lead := es[0].p.node.Leader() es[lead].Stop() time.Sleep(es[0].tickDuration * defaultElection * 2) waitLeader(es) - if es[1].node.Leader() == 0 { - t.Errorf("#%d: lead = %d, want not 0", i, es[1].node.Leader()) + if es[1].p.node.Leader() == 0 { + t.Errorf("#%d: lead = %d, want not 0", i, es[1].p.node.Leader()) } for i := range es { @@ -81,7 +81,7 @@ func TestJoinThroughFollower(t *testing.T) { es[i], hs[i] = initTestServer(c, int64(i), false) } - go es[0].Bootstrap() + go es[0].Run() for i := 1; i < tt; i++ { go es[i].Run() @@ -106,7 +106,7 @@ type leadterm struct { func waitActiveLeader(es []*Server) (lead, term int64) { for { - if l, t := waitLeader(es); l >= 0 && es[l].mode == participant { + if l, t := waitLeader(es); l >= 0 && es[l].mode.Get() == participantMode { return l, t } } @@ -118,12 +118,12 @@ func waitLeader(es []*Server) (lead, term int64) { for { ls := make([]leadterm, 0, len(es)) for i := range es { - switch es[i].mode { - case participant: + switch es[i].mode.Get() { + case participantMode: ls = append(ls, getLead(es[i])) - case standby: + case standbyMode: //TODO(xiangli) add standby support - case stop: + case stopMode: } } if isSameLead(ls) { @@ -134,7 +134,7 @@ func waitLeader(es []*Server) (lead, term int64) { } func getLead(s *Server) leadterm { - return leadterm{s.node.Leader(), s.node.Term()} + return leadterm{s.p.node.Leader(), s.p.node.Term()} } func isSameLead(ls []leadterm) bool { diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index f6f46698e..795689db5 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -91,18 +91,19 @@ func TestAdd(t *testing.T) { es[i], hs[i] = initTestServer(c, int64(i), false) } - go es[0].Bootstrap() + go es[0].Run() + <-es[0].modeC for i := 1; i < tt; i++ { id := int64(i) for { - lead := es[0].node.Leader() + lead := es[0].p.node.Leader() if lead == -1 { time.Sleep(defaultElection * es[0].tickDuration) continue } - err := es[lead].Add(id, es[id].raftPubAddr, es[id].pubAddr) + err := es[lead].p.add(id, es[id].raftPubAddr, es[id].pubAddr) if err == nil { break } @@ -115,12 +116,12 @@ func TestAdd(t *testing.T) { t.Fatal(err) } } - es[i].initParticipant() - go es[i].run() + go es[i].Run() + <-es[i].modeC for j := 0; j <= i; j++ { p := fmt.Sprintf("%s/%d", v2machineKVPrefix, id) - w, err := es[j].Watch(p, false, false, 1) + w, err := es[j].p.Watch(p, false, false, 1) if err != nil { t.Errorf("#%d on %d: %v", i, j, err) break @@ -149,7 +150,7 @@ func TestRemove(t *testing.T) { lead, _ := waitLeader(es) config := config.NewClusterConfig() config.ActiveSize = 0 - if err := es[lead].setClusterConfig(config); err != nil { + if err := es[lead].p.setClusterConfig(config); err != nil { t.Fatalf("#%d: setClusterConfig err = %v", k, err) } @@ -157,8 +158,6 @@ func TestRemove(t *testing.T) { // not 100 percent safe in our raft. // TODO(yichengq): improve it later. for i := 0; i < tt-2; i++ { - <-es[i].modeC - id := int64(i) send := id for { @@ -167,13 +166,13 @@ func TestRemove(t *testing.T) { send = id } - lead := es[send].node.Leader() + lead := es[send].p.node.Leader() if lead == -1 { time.Sleep(defaultElection * 5 * time.Millisecond) continue } - err := es[lead].Remove(id) + err := es[lead].p.remove(id) if err == nil { break } @@ -190,7 +189,7 @@ func TestRemove(t *testing.T) { } - if g := <-es[i].modeC; g != standby { + if g := <-es[i].modeC; g != standbyMode { t.Errorf("#%d on %d: mode = %d, want standby", k, i, g) } } @@ -223,22 +222,18 @@ func TestBecomeStandby(t *testing.T) { } id := int64(i) - if g := <-es[i].modeC; g != participant { - t.Fatalf("#%d: mode = %d, want participant", i, g) - } - config := config.NewClusterConfig() config.SyncInterval = 1000 config.ActiveSize = size - 1 - if err := es[lead].setClusterConfig(config); err != nil { + if err := es[lead].p.setClusterConfig(config); err != nil { t.Fatalf("#%d: setClusterConfig err = %v", i, err) } - if err := es[lead].Remove(id); err != nil { + if err := es[lead].p.remove(id); err != nil { t.Fatalf("#%d: remove err = %v", i, err) } - if g := <-es[i].modeC; g != standby { + if g := <-es[i].modeC; g != standbyMode { t.Fatalf("#%d: mode = %d, want standby", i, g) } if g := len(es[i].modeC); g != 0 { @@ -246,12 +241,12 @@ func TestBecomeStandby(t *testing.T) { } for k := 0; k < 4; k++ { - if es[i].leader != noneId { + if es[i].s.leader != noneId { break } time.Sleep(20 * time.Millisecond) } - if g := es[i].leader; g != lead { + if g := es[i].s.leader; g != lead { t.Errorf("#%d: lead = %d, want %d", i, g, lead) } @@ -279,7 +274,7 @@ func TestModeSwitch(t *testing.T) { es, hs := buildCluster(size, false) waitCluster(t, es) - if g := <-es[i].modeC; g != participant { + if g := <-es[i].modeC; g != participantMode { t.Fatalf("#%d: mode = %d, want participant", i, g) } @@ -294,14 +289,14 @@ func TestModeSwitch(t *testing.T) { } config.ActiveSize = size - 1 - if err := es[lead].setClusterConfig(config); err != nil { + if err := es[lead].p.setClusterConfig(config); err != nil { t.Fatalf("#%d: setClusterConfig err = %v", i, err) } - if err := es[lead].Remove(id); err != nil { + if err := es[lead].p.remove(id); err != nil { t.Fatalf("#%d: remove err = %v", i, err) } - if g := <-es[i].modeC; g != standby { + if g := <-es[i].modeC; g != standbyMode { t.Fatalf("#%d: mode = %d, want standby", i, g) } if g := len(es[i].modeC); g != 0 { @@ -309,21 +304,21 @@ func TestModeSwitch(t *testing.T) { } for k := 0; k < 4; k++ { - if es[i].leader != noneId { + if es[i].s.leader != noneId { break } time.Sleep(20 * time.Millisecond) } - if g := es[i].leader; g != lead { + if g := es[i].s.leader; g != lead { t.Errorf("#%d: lead = %d, want %d", i, g, lead) } config.ActiveSize = size - if err := es[lead].setClusterConfig(config); err != nil { + if err := es[lead].p.setClusterConfig(config); err != nil { t.Fatalf("#%d: setClusterConfig err = %v", i, err) } - if g := <-es[i].modeC; g != participant { + if g := <-es[i].modeC; g != participantMode { t.Fatalf("#%d: mode = %d, want participant", i, g) } if g := len(es[i].modeC); g != 0 { @@ -364,17 +359,17 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { if i == bootstrapper { seed = hs[i].URL - go es[i].Bootstrap() } else { // wait for the previous configuration change to be committed // or this configuration request might be dropped - w, err := es[0].Watch(v2machineKVPrefix, true, false, uint64(i)) + w, err := es[0].p.Watch(v2machineKVPrefix, true, false, uint64(i)) if err != nil { panic(err) } <-w.EventChan - go es[i].Join() } + go es[i].Run() + <-es[i].modeC } return es, hs } @@ -404,7 +399,7 @@ func waitCluster(t *testing.T, es []*Server) { var index uint64 for k := 0; k < n; k++ { index++ - w, err := e.Watch(v2machineKVPrefix, true, false, index) + w, err := e.p.Watch(v2machineKVPrefix, true, false, index) if err != nil { panic(err) } @@ -429,12 +424,12 @@ func waitCluster(t *testing.T, es []*Server) { func checkParticipant(i int, es []*Server) error { lead, _ := waitActiveLeader(es) key := fmt.Sprintf("/%d", rand.Int31()) - ev, err := es[lead].Set(key, false, "bar", store.Permanent) + ev, err := es[lead].p.Set(key, false, "bar", store.Permanent) if err != nil { return err } - w, err := es[i].Watch(key, false, false, ev.Index()) + w, err := es[i].p.Watch(key, false, false, ev.Index()) if err != nil { return err } diff --git a/etcd/participant.go b/etcd/participant.go new file mode 100644 index 000000000..d71ded93a --- /dev/null +++ b/etcd/participant.go @@ -0,0 +1,328 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "net/url" + "path" + "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/store" +) + +const ( + defaultHeartbeat = 1 + defaultElection = 5 + + maxBufferedProposal = 128 + + defaultTickDuration = time.Millisecond * 100 + + v2machineKVPrefix = "/_etcd/machines" + v2configKVPrefix = "/_etcd/config" + + v2Prefix = "/v2/keys" + v2machinePrefix = "/v2/machines" + v2peersPrefix = "/v2/peers" + v2LeaderPrefix = "/v2/leader" + v2StoreStatsPrefix = "/v2/stats/store" + v2adminConfigPrefix = "/v2/admin/config" + v2adminMachinesPrefix = "/v2/admin/machines/" +) + +var ( + tmpErr = fmt.Errorf("try again") + raftStopErr = fmt.Errorf("raft is stopped") + noneId int64 = -1 +) + +type participant struct { + id int64 + pubAddr string + raftPubAddr string + seeds map[string]bool + tickDuration time.Duration + + client *v2client + peerHub *peerHub + + proposal chan v2Proposal + addNodeC chan raft.Config + removeNodeC chan raft.Config + node *v2Raft + store.Store + rh *raftHandler + + stopc chan struct{} + + *http.ServeMux +} + +func newParticipant(id int64, pubAddr string, raftPubAddr string, seeds map[string]bool, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant { + p := &participant{ + id: id, + pubAddr: pubAddr, + raftPubAddr: raftPubAddr, + seeds: seeds, + tickDuration: tickDuration, + + client: client, + peerHub: peerHub, + + proposal: make(chan v2Proposal, maxBufferedProposal), + addNodeC: make(chan raft.Config, 1), + removeNodeC: make(chan raft.Config, 1), + node: &v2Raft{ + Node: raft.New(id, defaultHeartbeat, defaultElection), + result: make(map[wait]chan interface{}), + }, + Store: store.New(), + rh: newRaftHandler(peerHub), + + stopc: make(chan struct{}), + + ServeMux: http.NewServeMux(), + } + + p.Handle(v2Prefix+"/", handlerErr(p.serveValue)) + p.Handle(v2machinePrefix, handlerErr(p.serveMachines)) + p.Handle(v2peersPrefix, handlerErr(p.serveMachines)) + p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader)) + p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats)) + p.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig)) + p.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines)) + return p +} + +func (p *participant) run() { + if len(p.seeds) == 0 { + log.Println("starting a bootstrap node") + p.node.Campaign() + p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr)) + p.apply(p.node.Next()) + } else { + log.Println("joining cluster via peers", p.seeds) + p.join() + } + + p.rh.start() + defer p.rh.stop() + + node := p.node + defer node.StopProposalWaiters() + + recv := p.rh.recv + ticker := time.NewTicker(p.tickDuration) + v2SyncTicker := time.NewTicker(time.Millisecond * 500) + + var proposal chan v2Proposal + var addNodeC, removeNodeC chan raft.Config + for { + if node.HasLeader() { + proposal = p.proposal + addNodeC = p.addNodeC + removeNodeC = p.removeNodeC + } else { + proposal = nil + addNodeC = nil + removeNodeC = nil + } + select { + case p := <-proposal: + node.Propose(p) + case c := <-addNodeC: + node.UpdateConf(raft.AddNode, &c) + case c := <-removeNodeC: + node.UpdateConf(raft.RemoveNode, &c) + case msg := <-recv: + node.Step(*msg) + case <-ticker.C: + node.Tick() + case <-v2SyncTicker.C: + node.Sync() + case <-p.stopc: + log.Printf("Participant %d stopped\n", p.id) + return + } + p.apply(node.Next()) + p.send(node.Msgs()) + if node.IsRemoved() { + log.Printf("Participant %d return\n", p.id) + return + } + } +} + +func (p *participant) stop() { + close(p.stopc) +} + +func (p *participant) raftHandler() http.Handler { + return p.rh +} + +func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error { + pp := path.Join(v2machineKVPrefix, fmt.Sprint(id)) + + _, err := p.Get(pp, false, false) + if err == nil { + return nil + } + if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound { + return err + } + + w, err := p.Watch(pp, true, false, 0) + if err != nil { + log.Println("add error:", err) + return tmpErr + } + + select { + case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}: + default: + w.Remove() + log.Println("unable to send out addNode proposal") + return tmpErr + } + + select { + case v := <-w.EventChan: + if v.Action == store.Set { + return nil + } + log.Println("add error: action =", v.Action) + return tmpErr + case <-time.After(6 * defaultHeartbeat * p.tickDuration): + w.Remove() + log.Println("add error: wait timeout") + return tmpErr + } +} + +func (p *participant) remove(id int64) error { + pp := path.Join(v2machineKVPrefix, fmt.Sprint(id)) + + v, err := p.Get(pp, false, false) + if err != nil { + return nil + } + + select { + case p.removeNodeC <- raft.Config{NodeId: id}: + default: + log.Println("unable to send out removeNode proposal") + return tmpErr + } + + // TODO(xiangli): do not need to watch if the + // removal target is self + w, err := p.Watch(pp, true, false, v.Index()+1) + if err != nil { + log.Println("remove error:", err) + return tmpErr + } + + select { + case v := <-w.EventChan: + if v.Action == store.Delete { + return nil + } + log.Println("remove error: action =", v.Action) + return tmpErr + case <-time.After(6 * defaultHeartbeat * p.tickDuration): + w.Remove() + log.Println("remove error: wait timeout") + return tmpErr + } +} + +func (p *participant) apply(ents []raft.Entry) { + offset := p.node.Applied() - int64(len(ents)) + 1 + for i, ent := range ents { + switch ent.Type { + // expose raft entry type + case raft.Normal: + if len(ent.Data) == 0 { + continue + } + p.v2apply(offset+int64(i), ent) + case raft.AddNode: + cfg := new(raft.Config) + if err := json.Unmarshal(ent.Data, cfg); err != nil { + log.Println(err) + break + } + peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr) + if err != nil { + log.Println(err) + break + } + peer.participate() + log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context)) + pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) + if _, err := p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent); err == nil { + p.seeds[cfg.Addr] = true + } + case raft.RemoveNode: + cfg := new(raft.Config) + if err := json.Unmarshal(ent.Data, cfg); err != nil { + log.Println(err) + break + } + log.Printf("Remove Node %x\n", cfg.NodeId) + delete(p.seeds, p.fetchAddrFromStore(cfg.NodeId)) + peer, err := p.peerHub.peer(cfg.NodeId) + if err != nil { + log.Fatal("cannot get the added peer:", err) + } + peer.idle() + pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) + p.Store.Delete(pp, false, false) + default: + panic("unimplemented") + } + } +} + +func (p *participant) send(msgs []raft.Message) { + for i := range msgs { + if err := p.peerHub.send(msgs[i]); err != nil { + log.Println("send:", err) + } + } +} + +func (p *participant) fetchAddrFromStore(nodeId int64) string { + pp := path.Join(v2machineKVPrefix, fmt.Sprint(nodeId)) + if ev, err := p.Get(pp, false, false); err == nil { + if m, err := url.ParseQuery(*ev.Node.Value); err == nil { + return m["raft"][0] + } + } + return "" +} + +func (p *participant) join() { + info := &context{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + ClientURL: p.pubAddr, + PeerURL: p.raftPubAddr, + } + + for i := 0; i < 5; i++ { + for seed := range p.seeds { + if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil { + return + } else { + log.Println(err) + } + } + time.Sleep(100 * time.Millisecond) + } +} diff --git a/etcd/peer.go b/etcd/peer.go index 223d41a59..a98c9f412 100644 --- a/etcd/peer.go +++ b/etcd/peer.go @@ -14,9 +14,9 @@ const ( ) const ( - // participant is defined in etcd.go - idle = iota + 1 - stopped + participantPeer = iota + idlePeer + stoppedPeer ) type peer struct { @@ -32,7 +32,7 @@ type peer struct { func newPeer(url string, c *http.Client) *peer { return &peer{ url: url, - status: idle, + status: idlePeer, c: c, } } @@ -41,7 +41,7 @@ func (p *peer) participate() { p.mu.Lock() defer p.mu.Unlock() p.queue = make(chan []byte) - p.status = participant + p.status = participantPeer for i := 0; i < maxInflight; i++ { p.wg.Add(1) go p.handle(p.queue) @@ -51,18 +51,18 @@ func (p *peer) participate() { func (p *peer) idle() { p.mu.Lock() defer p.mu.Unlock() - if p.status == participant { + if p.status == participantPeer { close(p.queue) } - p.status = idle + p.status = idlePeer } func (p *peer) stop() { p.mu.Lock() - if p.status == participant { + if p.status == participantPeer { close(p.queue) } - p.status = stopped + p.status = stoppedPeer p.mu.Unlock() p.wg.Wait() } @@ -79,13 +79,13 @@ func (p *peer) send(d []byte) error { defer p.mu.Unlock() switch p.status { - case participant: + case participantPeer: select { case p.queue <- d: default: return fmt.Errorf("reach max serving") } - case idle: + case idlePeer: if p.inflight.Get() > maxInflight { return fmt.Errorf("reach max idle") } @@ -94,7 +94,7 @@ func (p *peer) send(d []byte) error { p.post(d) p.wg.Done() }() - case stopped: + case stoppedPeer: return fmt.Errorf("sender stopped") } return nil @@ -122,3 +122,7 @@ func (i *atomicInt) Add(d int64) { func (i *atomicInt) Get() int64 { return atomic.LoadInt64((*int64)(i)) } + +func (i *atomicInt) Set(n int64) { + atomic.StoreInt64((*int64)(i), n) +} diff --git a/etcd/standby.go b/etcd/standby.go new file mode 100644 index 000000000..579adb3ec --- /dev/null +++ b/etcd/standby.go @@ -0,0 +1,137 @@ +package etcd + +import ( + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/coreos/etcd/config" + "github.com/coreos/etcd/store" +) + +type standby struct { + id int64 + pubAddr string + raftPubAddr string + + client *v2client + peerHub *peerHub + + nodes map[string]bool + + leader int64 + leaderAddr string + clusterConf *config.ClusterConfig + + stopc chan struct{} + + *http.ServeMux +} + +func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]bool, client *v2client, peerHub *peerHub) *standby { + s := &standby{ + id: id, + pubAddr: pubAddr, + raftPubAddr: raftPubAddr, + + client: client, + peerHub: peerHub, + + nodes: nodes, + + leader: noneId, + leaderAddr: "", + clusterConf: config.NewClusterConfig(), + + stopc: make(chan struct{}), + + ServeMux: http.NewServeMux(), + } + s.Handle("/", handlerErr(s.serveRedirect)) + return s +} + +func (s *standby) run() { + var syncDuration time.Duration + for { + select { + case <-time.After(syncDuration): + case <-s.stopc: + log.Printf("Standby %d stopped\n", s.id) + return + } + + if err := s.syncCluster(); err != nil { + log.Println("standby sync:", err) + continue + } + syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second)) + if s.clusterConf.ActiveSize <= len(s.nodes) { + continue + } + if err := s.joinByAddr(s.leaderAddr); err != nil { + log.Println("standby join:", err) + continue + } + return + } +} + +func (s *standby) stop() { + close(s.stopc) +} + +func (s *standby) serveRedirect(w http.ResponseWriter, r *http.Request) error { + if s.leader == noneId { + return fmt.Errorf("no leader in the cluster") + } + redirectAddr, err := buildRedirectURL(s.leaderAddr, r.URL) + if err != nil { + return err + } + http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect) + return nil +} + +func (s *standby) syncCluster() error { + for node := range s.nodes { + machines, err := s.client.GetMachines(node) + if err != nil { + continue + } + config, err := s.client.GetClusterConfig(node) + if err != nil { + continue + } + s.nodes = make(map[string]bool) + for _, machine := range machines { + s.nodes[machine.PeerURL] = true + if machine.State == stateLeader { + id, err := strconv.ParseInt(machine.Name, 0, 64) + if err != nil { + return err + } + s.leader = id + s.leaderAddr = machine.PeerURL + } + } + s.clusterConf = config + return nil + } + return fmt.Errorf("unreachable cluster") +} + +func (s *standby) joinByAddr(addr string) error { + info := &context{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + ClientURL: s.pubAddr, + PeerURL: s.raftPubAddr, + } + if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil { + return err + } + return nil +} diff --git a/etcd/v2_admin.go b/etcd/v2_admin.go index 2ebb8653c..8922224e4 100644 --- a/etcd/v2_admin.go +++ b/etcd/v2_admin.go @@ -34,19 +34,19 @@ type context struct { PeerURL string `json:"peerURL"` } -func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error { +func (p *participant) serveAdminConfig(w http.ResponseWriter, r *http.Request) error { switch r.Method { case "GET": case "PUT": - if !s.node.IsLeader() { - return s.redirect(w, r, s.node.Leader()) + if !p.node.IsLeader() { + return p.redirect(w, r, p.node.Leader()) } - c := s.ClusterConfig() + c := p.clusterConfig() if err := json.NewDecoder(r.Body).Decode(c); err != nil { return err } c.Sanitize() - if err := s.setClusterConfig(c); err != nil { + if err := p.setClusterConfig(c); err != nil { return err } default: @@ -54,20 +54,20 @@ func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(s.ClusterConfig()) + json.NewEncoder(w).Encode(p.clusterConfig()) return nil } -func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) error { +func (p *participant) serveAdminMachines(w http.ResponseWriter, r *http.Request) error { name := strings.TrimPrefix(r.URL.Path, v2adminMachinesPrefix) switch r.Method { case "GET": var info interface{} var err error if name != "" { - info, err = s.someMachineMessage(name) + info, err = p.someMachineMessage(name) } else { - info, err = s.allMachineMessages() + info, err = p.allMachineMessages() } if err != nil { return err @@ -75,8 +75,8 @@ func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) erro w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(info) case "PUT": - if !s.node.IsLeader() { - return s.redirect(w, r, s.node.Leader()) + if !p.node.IsLeader() { + return p.redirect(w, r, p.node.Leader()) } id, err := strconv.ParseInt(name, 0, 64) if err != nil { @@ -86,60 +86,60 @@ func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) erro if err := json.NewDecoder(r.Body).Decode(info); err != nil { return err } - return s.Add(id, info.PeerURL, info.ClientURL) + return p.add(id, info.PeerURL, info.ClientURL) case "DELETE": - if !s.node.IsLeader() { - return s.redirect(w, r, s.node.Leader()) + if !p.node.IsLeader() { + return p.redirect(w, r, p.node.Leader()) } id, err := strconv.ParseInt(name, 0, 64) if err != nil { return err } - return s.Remove(id) + return p.remove(id) default: return allow(w, "GET", "PUT", "DELETE") } return nil } -func (s *Server) ClusterConfig() *config.ClusterConfig { +func (p *participant) clusterConfig() *config.ClusterConfig { c := config.NewClusterConfig() // This is used for backward compatibility because it doesn't // set cluster config in older version. - if e, err := s.Get(v2configKVPrefix, false, false); err == nil { + if e, err := p.Get(v2configKVPrefix, false, false); err == nil { json.Unmarshal([]byte(*e.Node.Value), c) } return c } -func (s *Server) setClusterConfig(c *config.ClusterConfig) error { +func (p *participant) setClusterConfig(c *config.ClusterConfig) error { b, err := json.Marshal(c) if err != nil { return err } - if _, err := s.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil { + if _, err := p.Set(v2configKVPrefix, false, string(b), store.Permanent); err != nil { return err } return nil } // someMachineMessage return machine message of specified name. -func (s *Server) someMachineMessage(name string) (*machineMessage, error) { - p := filepath.Join(v2machineKVPrefix, name) - e, err := s.Get(p, false, false) +func (p *participant) someMachineMessage(name string) (*machineMessage, error) { + pp := filepath.Join(v2machineKVPrefix, name) + e, err := p.Get(pp, false, false) if err != nil { return nil, err } - lead := fmt.Sprint(s.node.Leader()) + lead := fmt.Sprint(p.node.Leader()) return newMachineMessage(e.Node, lead), nil } -func (s *Server) allMachineMessages() ([]*machineMessage, error) { - e, err := s.Get(v2machineKVPrefix, false, false) +func (p *participant) allMachineMessages() ([]*machineMessage, error) { + e, err := p.Get(v2machineKVPrefix, false, false) if err != nil { return nil, err } - lead := fmt.Sprint(s.node.Leader()) + lead := fmt.Sprint(p.node.Leader()) ms := make([]*machineMessage, len(e.Node.Nodes)) for i, n := range e.Node.Nodes { ms[i] = newMachineMessage(n, lead) diff --git a/etcd/v2_apply.go b/etcd/v2_apply.go index 0e2361956..d344af060 100644 --- a/etcd/v2_apply.go +++ b/etcd/v2_apply.go @@ -9,7 +9,7 @@ import ( "github.com/coreos/etcd/store" ) -func (s *Server) v2apply(index int64, ent raft.Entry) { +func (p *participant) v2apply(index int64, ent raft.Entry) { var ret interface{} var e *store.Event var err error @@ -22,36 +22,36 @@ func (s *Server) v2apply(index int64, ent raft.Entry) { switch cmd.Type { case "set": - e, err = s.Store.Set(cmd.Key, cmd.Dir, cmd.Value, cmd.Time) + e, err = p.Store.Set(cmd.Key, cmd.Dir, cmd.Value, cmd.Time) case "update": - e, err = s.Store.Update(cmd.Key, cmd.Value, cmd.Time) + e, err = p.Store.Update(cmd.Key, cmd.Value, cmd.Time) case "create", "unique": - e, err = s.Store.Create(cmd.Key, cmd.Dir, cmd.Value, cmd.Unique, cmd.Time) + e, err = p.Store.Create(cmd.Key, cmd.Dir, cmd.Value, cmd.Unique, cmd.Time) case "delete": - e, err = s.Store.Delete(cmd.Key, cmd.Dir, cmd.Recursive) + e, err = p.Store.Delete(cmd.Key, cmd.Dir, cmd.Recursive) case "cad": - e, err = s.Store.CompareAndDelete(cmd.Key, cmd.PrevValue, cmd.PrevIndex) + e, err = p.Store.CompareAndDelete(cmd.Key, cmd.PrevValue, cmd.PrevIndex) case "cas": - e, err = s.Store.CompareAndSwap(cmd.Key, cmd.PrevValue, cmd.PrevIndex, cmd.Value, cmd.Time) + e, err = p.Store.CompareAndSwap(cmd.Key, cmd.PrevValue, cmd.PrevIndex, cmd.Value, cmd.Time) case "sync": - s.Store.DeleteExpiredKeys(cmd.Time) + p.Store.DeleteExpiredKeys(cmd.Time) return default: log.Println("unexpected command type:", cmd.Type) } - if ent.Term > s.node.term { - s.node.term = ent.Term - for k, v := range s.node.result { - if k.term < s.node.term { + if ent.Term > p.node.term { + p.node.term = ent.Term + for k, v := range p.node.result { + if k.term < p.node.term { v <- fmt.Errorf("proposal lost due to leader election") - delete(s.node.result, k) + delete(p.node.result, k) } } } w := wait{index, ent.Term} - if s.node.result[w] == nil { + if p.node.result[w] == nil { return } @@ -60,6 +60,6 @@ func (s *Server) v2apply(index int64, ent raft.Entry) { } else { ret = e } - s.node.result[w] <- ret - delete(s.node.result, w) + p.node.result[w] <- ret + delete(p.node.result, w) } diff --git a/etcd/v2_http.go b/etcd/v2_http.go index 8c7593f99..b6854a82a 100644 --- a/etcd/v2_http.go +++ b/etcd/v2_http.go @@ -10,28 +10,28 @@ import ( etcdErr "github.com/coreos/etcd/error" ) -func (s *Server) serveValue(w http.ResponseWriter, r *http.Request) error { +func (p *participant) serveValue(w http.ResponseWriter, r *http.Request) error { switch r.Method { case "GET": - return s.GetHandler(w, r) + return p.GetHandler(w, r) case "HEAD": w = &HEADResponseWriter{w} - return s.GetHandler(w, r) + return p.GetHandler(w, r) case "PUT": - return s.PutHandler(w, r) + return p.PutHandler(w, r) case "POST": - return s.PostHandler(w, r) + return p.PostHandler(w, r) case "DELETE": - return s.DeleteHandler(w, r) + return p.DeleteHandler(w, r) } return allow(w, "GET", "PUT", "POST", "DELETE", "HEAD") } -func (s *Server) serveMachines(w http.ResponseWriter, r *http.Request) error { +func (p *participant) serveMachines(w http.ResponseWriter, r *http.Request) error { if r.Method != "GET" { return allow(w, "GET") } - v, err := s.Store.Get(v2machineKVPrefix, false, false) + v, err := p.Store.Get(v2machineKVPrefix, false, false) if err != nil { panic(err) } @@ -47,20 +47,20 @@ func (s *Server) serveMachines(w http.ResponseWriter, r *http.Request) error { return nil } -func (s *Server) serveLeader(w http.ResponseWriter, r *http.Request) error { +func (p *participant) serveLeader(w http.ResponseWriter, r *http.Request) error { if r.Method != "GET" { return allow(w, "GET") } - if p, ok := s.peerHub.peers[s.node.Leader()]; ok { + if p, ok := p.peerHub.peers[p.node.Leader()]; ok { w.Write([]byte(p.url)) return nil } return fmt.Errorf("no leader") } -func (s *Server) serveStoreStats(w http.ResponseWriter, req *http.Request) error { +func (p *participant) serveStoreStats(w http.ResponseWriter, req *http.Request) error { w.Header().Set("Content-Type", "application/json") - w.Write(s.Store.JsonStats()) + w.Write(p.Store.JsonStats()) return nil } @@ -99,8 +99,8 @@ func (w *HEADResponseWriter) Write([]byte) (int, error) { return 0, nil } -func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) error { - e, err := s.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, s.node.Leader()), false, false) +func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64) error { + e, err := p.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, p.node.Leader()), false, false) if err != nil { log.Println("redirect cannot find node", id) return fmt.Errorf("redirect cannot find node %d", id) @@ -111,7 +111,7 @@ func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) erro return fmt.Errorf("failed to parse node entry: %s", *e.Node.Value) } - redirectAddr, err := s.buildRedirectURL(m["etcd"][0], r.URL) + redirectAddr, err := buildRedirectURL(m["etcd"][0], r.URL) if err != nil { log.Println("redirect cannot build new url:", err) return err @@ -121,7 +121,7 @@ func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int64) erro return nil } -func (s *Server) buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) { +func buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) { redirectURL, err := url.Parse(redirectAddr) if err != nil { return "", fmt.Errorf("redirect cannot parse url: %v", err) diff --git a/etcd/v2_http_delete.go b/etcd/v2_http_delete.go index 6e7118ecf..02f70a896 100644 --- a/etcd/v2_http_delete.go +++ b/etcd/v2_http_delete.go @@ -8,9 +8,9 @@ import ( etcdErr "github.com/coreos/etcd/error" ) -func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error { - if !s.node.IsLeader() { - return s.redirect(w, req, s.node.Leader()) +func (p *participant) DeleteHandler(w http.ResponseWriter, req *http.Request) error { + if !p.node.IsLeader() { + return p.redirect(w, req, p.node.Leader()) } key := req.URL.Path[len("/v2/keys"):] @@ -23,7 +23,7 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error { _, indexOk := req.Form["prevIndex"] if !valueOk && !indexOk { - return s.serveDelete(w, req, key, dir, recursive) + return p.serveDelete(w, req, key, dir, recursive) } var err error @@ -36,32 +36,32 @@ func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error { // bad previous index if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", p.Store.Index()) } } if valueOk { if prevValue == "" { - return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", p.Store.Index()) } } - return s.serveCAD(w, req, key, prevValue, prevIndex) + return p.serveCAD(w, req, key, prevValue, prevIndex) } -func (s *Server) serveDelete(w http.ResponseWriter, req *http.Request, key string, dir, recursive bool) error { - ret, err := s.Delete(key, dir, recursive) +func (p *participant) serveDelete(w http.ResponseWriter, req *http.Request, key string, dir, recursive bool) error { + ret, err := p.Delete(key, dir, recursive) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("delete:", err) return err } -func (s *Server) serveCAD(w http.ResponseWriter, req *http.Request, key string, prevValue string, prevIndex uint64) error { - ret, err := s.CAD(key, prevValue, prevIndex) +func (p *participant) serveCAD(w http.ResponseWriter, req *http.Request, key string, prevValue string, prevIndex uint64) error { + ret, err := p.CAD(key, prevValue, prevIndex) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("cad:", err) diff --git a/etcd/v2_http_endpoint_test.go b/etcd/v2_http_endpoint_test.go index 62db3645b..4027d00a6 100644 --- a/etcd/v2_http_endpoint_test.go +++ b/etcd/v2_http_endpoint_test.go @@ -200,7 +200,7 @@ func TestPutAdminConfigEndPoint(t *testing.T) { barrier(t, 0, es) for j := range es { - e, err := es[j].Get(v2configKVPrefix, false, false) + e, err := es[j].p.Get(v2configKVPrefix, false, false) if err != nil { t.Errorf("%v", err) continue @@ -321,17 +321,17 @@ func TestGetAdminMachinesEndPoint(t *testing.T) { // barrier ensures that all servers have made further progress on applied index // compared to the base one. func barrier(t *testing.T, base int, es []*Server) { - applied := es[base].node.Applied() + applied := es[base].p.node.Applied() // time used for goroutine scheduling time.Sleep(5 * time.Millisecond) for i, e := range es { for j := 0; ; j++ { - if e.node.Applied() >= applied { + if e.p.node.Applied() >= applied { break } time.Sleep(defaultHeartbeat * defaultTickDuration) if j == 2 { - t.Fatalf("#%d: applied = %d, want >= %d", i, e.node.Applied(), applied) + t.Fatalf("#%d: applied = %d, want >= %d", i, e.p.node.Applied(), applied) } } } diff --git a/etcd/v2_http_get.go b/etcd/v2_http_get.go index 8b9b1670a..3e17f8a30 100644 --- a/etcd/v2_http_get.go +++ b/etcd/v2_http_get.go @@ -9,7 +9,7 @@ import ( etcdErr "github.com/coreos/etcd/error" ) -func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error { +func (p *participant) GetHandler(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v2/keys"):] // TODO(xiangli): handle consistent get recursive := (req.FormValue("recursive") == "true") @@ -17,12 +17,12 @@ func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error { waitIndex := req.FormValue("waitIndex") stream := (req.FormValue("stream") == "true") if req.FormValue("wait") == "true" { - return s.handleWatch(key, recursive, stream, waitIndex, w, req) + return p.handleWatch(key, recursive, stream, waitIndex, w, req) } - return s.handleGet(key, recursive, sort, w, req) + return p.handleGet(key, recursive, sort, w, req) } -func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error { +func (p *participant) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error { // Create a command to watch from a given index (default 0). var sinceIndex uint64 = 0 var err error @@ -30,11 +30,11 @@ func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex strin if waitIndex != "" { sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64) if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", p.Store.Index()) } } - watcher, err := s.Store.Watch(key, recursive, stream, sinceIndex) + watcher, err := p.Store.Watch(key, recursive, stream, sinceIndex) if err != nil { return err } @@ -42,7 +42,7 @@ func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex strin cn, _ := w.(http.CloseNotifier) closeChan := cn.CloseNotify() - s.writeHeaders(w) + p.writeHeaders(w) if stream { // watcher hub will not help to remove stream watcher @@ -86,12 +86,12 @@ func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex strin return nil } -func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error { - event, err := s.Store.Get(key, recursive, sort) +func (p *participant) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error { + event, err := p.Store.Get(key, recursive, sort) if err != nil { return err } - s.writeHeaders(w) + p.writeHeaders(w) if req.Method == "HEAD" { return nil } @@ -103,9 +103,9 @@ func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWrit return nil } -func (s *Server) writeHeaders(w http.ResponseWriter) { +func (p *participant) writeHeaders(w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json") - w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store.Index())) + w.Header().Add("X-Etcd-Index", fmt.Sprint(p.Store.Index())) // TODO(xiangli): raft-index and term w.WriteHeader(http.StatusOK) } diff --git a/etcd/v2_http_post.go b/etcd/v2_http_post.go index 02e02f84e..2c7473416 100644 --- a/etcd/v2_http_post.go +++ b/etcd/v2_http_post.go @@ -8,9 +8,9 @@ import ( "github.com/coreos/etcd/store" ) -func (s *Server) PostHandler(w http.ResponseWriter, req *http.Request) error { - if !s.node.IsLeader() { - return s.redirect(w, req, s.node.Leader()) +func (p *participant) PostHandler(w http.ResponseWriter, req *http.Request) error { + if !p.node.IsLeader() { + return p.redirect(w, req, p.node.Leader()) } key := req.URL.Path[len("/v2/keys"):] @@ -19,12 +19,12 @@ func (s *Server) PostHandler(w http.ResponseWriter, req *http.Request) error { dir := (req.FormValue("dir") == "true") expireTime, err := store.TTL(req.FormValue("ttl")) if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", p.Store.Index()) } - ret, err := s.Create(key, dir, value, expireTime, true) + ret, err := p.Create(key, dir, value, expireTime, true) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("unique:", err) diff --git a/etcd/v2_http_put.go b/etcd/v2_http_put.go index 7804323b2..a3bd50b69 100644 --- a/etcd/v2_http_put.go +++ b/etcd/v2_http_put.go @@ -13,9 +13,9 @@ import ( "github.com/coreos/etcd/store" ) -func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { - if !s.node.IsLeader() { - return s.redirect(w, req, s.node.Leader()) +func (p *participant) PutHandler(w http.ResponseWriter, req *http.Request) error { + if !p.node.IsLeader() { + return p.redirect(w, req, p.node.Leader()) } key := req.URL.Path[len("/v2/keys"):] @@ -27,7 +27,7 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { expireTime, err := store.TTL(req.Form.Get("ttl")) if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", p.Store.Index()) } prevValue, valueOk := firstValue(req.Form, "prevValue") @@ -36,7 +36,7 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { // Set handler: create a new node or replace the old one. if !valueOk && !indexOk && !existOk { - return s.serveSet(w, req, key, dir, value, expireTime) + return p.serveSet(w, req, key, dir, value, expireTime) } // update with test @@ -44,11 +44,11 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { if prevExist == "false" { // Create command: create a new node. Fail, if a node already exists // Ignore prevIndex and prevValue - return s.serveCreate(w, req, key, dir, value, expireTime) + return p.serveCreate(w, req, key, dir, value, expireTime) } if prevExist == "true" && !indexOk && !valueOk { - return s.serveUpdate(w, req, key, value, expireTime) + return p.serveUpdate(w, req, key, value, expireTime) } } @@ -59,7 +59,7 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { // bad previous index if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", p.Store.Index()) } } else { prevIndex = 0 @@ -67,22 +67,22 @@ func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { if valueOk { if prevValue == "" { - return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", p.Store.Index()) } } - return s.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime) + return p.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime) } -func (s *Server) handleRet(w http.ResponseWriter, ret *store.Event) { +func (p *participant) handleRet(w http.ResponseWriter, ret *store.Event) { b, _ := json.Marshal(ret) w.Header().Set("Content-Type", "application/json") // etcd index should be the same as the event index // which is also the last modified index of the node w.Header().Add("X-Etcd-Index", fmt.Sprint(ret.Index())) - // w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) - // w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) + // w.Header().Add("X-Raft-Index", fmt.Sprint(p.CommitIndex())) + // w.Header().Add("X-Raft-Term", fmt.Sprint(p.Term())) if ret.IsCreated() { w.WriteHeader(http.StatusCreated) @@ -93,44 +93,44 @@ func (s *Server) handleRet(w http.ResponseWriter, ret *store.Event) { w.Write(b) } -func (s *Server) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error { - ret, err := s.Set(key, dir, value, expireTime) +func (p *participant) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error { + ret, err := p.Set(key, dir, value, expireTime) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("set:", err) return err } -func (s *Server) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error { - ret, err := s.Create(key, dir, value, expireTime, false) +func (p *participant) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error { + ret, err := p.Create(key, dir, value, expireTime, false) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("create:", err) return err } -func (s *Server) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error { +func (p *participant) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error { // Update should give at least one option if value == "" && expireTime.Sub(store.Permanent) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store.Index()) + return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", p.Store.Index()) } - ret, err := s.Update(key, value, expireTime) + ret, err := p.Update(key, value, expireTime) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("update:", err) return err } -func (s *Server) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error { - ret, err := s.CAS(key, value, prevValue, prevIndex, expireTime) +func (p *participant) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error { + ret, err := p.CAS(key, value, prevValue, prevIndex, expireTime) if err == nil { - s.handleRet(w, ret) + p.handleRet(w, ret) return nil } log.Println("update:", err) diff --git a/etcd/v2_standby.go b/etcd/v2_standby.go deleted file mode 100644 index 95a462d10..000000000 --- a/etcd/v2_standby.go +++ /dev/null @@ -1,47 +0,0 @@ -package etcd - -import ( - "fmt" - "net/http" - "strconv" -) - -func (s *Server) serveRedirect(w http.ResponseWriter, r *http.Request) error { - if s.leader == noneId { - return fmt.Errorf("no leader in the cluster") - } - redirectAddr, err := s.buildRedirectURL(s.leaderAddr, r.URL) - if err != nil { - return err - } - http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect) - return nil -} - -func (s *Server) syncCluster() error { - for node := range s.nodes { - machines, err := s.client.GetMachines(node) - if err != nil { - continue - } - config, err := s.client.GetClusterConfig(node) - if err != nil { - continue - } - s.nodes = make(map[string]bool) - for _, machine := range machines { - s.nodes[machine.PeerURL] = true - if machine.State == stateLeader { - id, err := strconv.ParseInt(machine.Name, 0, 64) - if err != nil { - return err - } - s.leader = id - s.leaderAddr = machine.PeerURL - } - } - s.clusterConf = config - return nil - } - return fmt.Errorf("unreachable cluster") -} diff --git a/etcd/v2_store.go b/etcd/v2_store.go index 1c044daac..0381013ab 100644 --- a/etcd/v2_store.go +++ b/etcd/v2_store.go @@ -20,57 +20,54 @@ type cmd struct { Time time.Time } -func (s *Server) Set(key string, dir bool, value string, expireTime time.Time) (*store.Event, error) { +func (p *participant) Set(key string, dir bool, value string, expireTime time.Time) (*store.Event, error) { set := &cmd{Type: "set", Key: key, Dir: dir, Value: value, Time: expireTime} - return s.do(set) + return p.do(set) } -func (s *Server) Create(key string, dir bool, value string, expireTime time.Time, unique bool) (*store.Event, error) { +func (p *participant) Create(key string, dir bool, value string, expireTime time.Time, unique bool) (*store.Event, error) { create := &cmd{Type: "create", Key: key, Dir: dir, Value: value, Time: expireTime, Unique: unique} - return s.do(create) + return p.do(create) } -func (s *Server) Update(key string, value string, expireTime time.Time) (*store.Event, error) { +func (p *participant) Update(key string, value string, expireTime time.Time) (*store.Event, error) { update := &cmd{Type: "update", Key: key, Value: value, Time: expireTime} - return s.do(update) + return p.do(update) } -func (s *Server) CAS(key, value, prevValue string, prevIndex uint64, expireTime time.Time) (*store.Event, error) { +func (p *participant) CAS(key, value, prevValue string, prevIndex uint64, expireTime time.Time) (*store.Event, error) { cas := &cmd{Type: "cas", Key: key, Value: value, PrevValue: prevValue, PrevIndex: prevIndex, Time: expireTime} - return s.do(cas) + return p.do(cas) } -func (s *Server) Delete(key string, dir, recursive bool) (*store.Event, error) { +func (p *participant) Delete(key string, dir, recursive bool) (*store.Event, error) { d := &cmd{Type: "delete", Key: key, Dir: dir, Recursive: recursive} - return s.do(d) + return p.do(d) } -func (s *Server) CAD(key string, prevValue string, prevIndex uint64) (*store.Event, error) { +func (p *participant) CAD(key string, prevValue string, prevIndex uint64) (*store.Event, error) { cad := &cmd{Type: "cad", Key: key, PrevValue: prevValue, PrevIndex: prevIndex} - return s.do(cad) + return p.do(cad) } -func (s *Server) do(c *cmd) (*store.Event, error) { +func (p *participant) do(c *cmd) (*store.Event, error) { data, err := json.Marshal(c) if err != nil { panic(err) } - p := v2Proposal{ + pp := v2Proposal{ data: data, ret: make(chan interface{}, 1), } - if s.mode != participant { - return nil, raftStopErr - } select { - case s.proposal <- p: + case p.proposal <- pp: default: return nil, fmt.Errorf("unable to send out the proposal") } - switch t := (<-p.ret).(type) { + switch t := (<-pp.ret).(type) { case *store.Event: return t, nil case error: