From 5e3fd6ee3fa7d0b00d8507e0ce78a5469d3af857 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 24 Sep 2014 19:51:27 -0700 Subject: [PATCH] etcdserver: introduce the cluster and member This introduces two new concepts: the cluster and the member. Members are logical etcd instances that have a name, raft ID, and a list of peer and client addresses. A cluster is made up of a list of members. --- Procfile | 8 +- etcdserver/cluster.go | 114 ++++++++++++++ etcdserver/cluster_store.go | 141 +++++++++++++++++ etcdserver/cluster_store_test.go | 115 ++++++++++++++ etcdserver/cluster_test.go | 143 +++++++++++++++++ etcdserver/etcdhttp/http.go | 20 +-- etcdserver/etcdhttp/http_test.go | 83 ++++++---- etcdserver/etcdhttp/peers.go | 157 ------------------- etcdserver/etcdhttp/peers_test.go | 248 ------------------------------ etcdserver/member.go | 43 ++++++ etcdserver/server.go | 10 +- etcdserver/server_test.go | 6 +- main.go | 43 +++--- 13 files changed, 654 insertions(+), 477 deletions(-) create mode 100644 etcdserver/cluster.go create mode 100644 etcdserver/cluster_store.go create mode 100644 etcdserver/cluster_store_test.go create mode 100644 etcdserver/cluster_test.go delete mode 100644 etcdserver/etcdhttp/peers.go delete mode 100644 etcdserver/etcdhttp/peers_test.go create mode 100644 etcdserver/member.go diff --git a/Procfile b/Procfile index b7d6bb55a..4a2b65357 100644 --- a/Procfile +++ b/Procfile @@ -1,5 +1,5 @@ # Use goreman to run `go get github.com/mattn/goreman` -etcd1: bin/etcd -id 0x1 -bind-addr 127.0.0.1:4001 -peer-bind-addr :7001 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003' -etcd2: bin/etcd -id 0x2 -bind-addr 127.0.0.1:4002 -peer-bind-addr :7002 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003' -etcd3: bin/etcd -id 0x3 -bind-addr 127.0.0.1:4003 -peer-bind-addr :7003 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003' -proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003' +etcd1: bin/etcd -name node1 -bind-addr 127.0.0.1:4001 -peer-bind-addr :7001 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003' +etcd2: bin/etcd -name node2 -bind-addr 127.0.0.1:4002 -peer-bind-addr :7002 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003' +etcd3: bin/etcd -name node3 -bind-addr 127.0.0.1:4003 -peer-bind-addr :7003 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003' +#proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers 'localhost:7001,localhost:7002,localhost:7003' diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go new file mode 100644 index 000000000..793b63bad --- /dev/null +++ b/etcdserver/cluster.go @@ -0,0 +1,114 @@ +package etcdserver + +import ( + "fmt" + "math/rand" + "net/url" + "sort" + "strings" +) + +// Cluster is a list of Members that belong to the same raft cluster +type Cluster map[int64]*Member + +func (c Cluster) FindID(id int64) *Member { + return c[id] +} + +func (c Cluster) FindName(name string) *Member { + for _, m := range c { + if m.Name == name { + return m + } + } + + return nil +} + +func (c Cluster) Add(m Member) error { + if c.FindID(m.ID) != nil { + return fmt.Errorf("Member exists with identical ID %v", m) + } + c[m.ID] = &m + return nil +} + +func (c *Cluster) AddSlice(mems []Member) error { + for _, m := range mems { + err := c.Add(m) + if err != nil { + return err + } + } + + return nil +} + +// Pick chooses a random address from a given Member's addresses, and returns it as +// an addressible URI. If the given member does not exist, an empty string is returned. +func (c Cluster) Pick(id int64) string { + if m := c.FindID(id); m != nil { + addrs := m.PeerURLs + if len(addrs) == 0 { + return "" + } + return addrs[rand.Intn(len(addrs))] + } + + return "" +} + +// Set parses command line sets of names to IPs formatted like: +// mach0=1.1.1.1,mach0=2.2.2.2,mach0=1.1.1.1,mach1=2.2.2.2,mach1=3.3.3.3 +func (c *Cluster) Set(s string) error { + *c = Cluster{} + v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1)) + if err != nil { + return err + } + + for name, urls := range v { + if len(urls) == 0 || urls[0] == "" { + return fmt.Errorf("Empty URL given for %q", name) + } + m := newMember(name, urls) + err := c.Add(*m) + if err != nil { + return err + } + } + return nil +} + +func (c Cluster) String() string { + sl := []string{} + for _, m := range c { + for _, u := range m.PeerURLs { + sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u)) + } + } + sort.Strings(sl) + return strings.Join(sl, ",") +} + +func (c Cluster) IDs() []int64 { + var ids []int64 + for _, m := range c { + ids = append(ids, m.ID) + } + return ids +} + +// Endpoints returns a list of all peer addresses. Each address is prefixed +// with the scheme (currently "http://"). The returned list is sorted in +// ascending lexicographical order. +func (c Cluster) Endpoints() []string { + endpoints := make([]string, 0) + for _, p := range c { + for _, addr := range p.PeerURLs { + endpoints = append(endpoints, addScheme(addr)) + } + } + sort.Strings(endpoints) + return endpoints +} diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go new file mode 100644 index 000000000..12904d1dd --- /dev/null +++ b/etcdserver/cluster_store.go @@ -0,0 +1,141 @@ +package etcdserver + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/store" +) + +const ( + raftPrefix = "/raft" +) + +type ClusterStore interface { + Get() Cluster + Delete(id int64) +} + +type clusterStore struct { + Store store.Store +} + +func NewClusterStore(st store.Store, c Cluster) ClusterStore { + cls := &clusterStore{Store: st} + for _, m := range c { + cls.add(*m) + } + return cls +} + +// add puts a new Member into the store. +// A Member with a matching id must not exist. +func (s *clusterStore) add(m Member) { + b, err := json.Marshal(m) + if err != nil { + log.Panicf("marshal peer info error: %v", err) + } + + if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil { + log.Panicf("add member should never fail: %v", err) + } +} + +// TODO(philips): keep the latest copy without going to the store to avoid the +// lock here. +func (s *clusterStore) Get() Cluster { + c := &Cluster{} + e, err := s.Store.Get(machineKVPrefix, true, false) + if err != nil { + log.Panicf("get member should never fail: %v", err) + } + for _, n := range e.Node.Nodes { + m := Member{} + if err := json.Unmarshal([]byte(*n.Value), &m); err != nil { + log.Panicf("unmarshal peer error: %v", err) + } + err := c.Add(m) + if err != nil { + log.Panicf("add member to cluster should never fail: %v", err) + } + } + return *c +} + +// Delete removes a member from the store. +// The given id MUST exist. +func (s *clusterStore) Delete(id int64) { + p := s.Get().FindID(id).storeKey() + if _, err := s.Store.Delete(p, false, false); err != nil { + log.Panicf("delete peer should never fail: %v", err) + } +} + +// addScheme adds the protocol prefix to a string; currently only HTTP +// TODO: improve this when implementing TLS +func addScheme(addr string) string { + return fmt.Sprintf("http://%s", addr) +} + +func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) { + c := &http.Client{Transport: t} + + scheme := "http" + if t.TLSClientConfig != nil { + scheme = "https" + } + + return func(msgs []raftpb.Message) { + for _, m := range msgs { + // TODO: reuse go routines + // limit the number of outgoing connections for the same receiver + go send(c, scheme, cls, m) + } + } +} + +func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) { + // TODO (xiangli): reasonable retry logic + for i := 0; i < 3; i++ { + addr := cls.Get().Pick(m.To) + if addr == "" { + // TODO: unknown peer id.. what do we do? I + // don't think his should ever happen, need to + // look into this further. + log.Printf("etcdhttp: no addr for %d", m.To) + return + } + + url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix) + + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data, err := m.Marshal() + if err != nil { + log.Println("etcdhttp: dropping message:", err) + return // drop bad message + } + if httpPost(c, url, data) { + return // success + } + // TODO: backoff + } +} + +func httpPost(c *http.Client, url string, data []byte) bool { + resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data)) + if err != nil { + // TODO: log the error? + return false + } + resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + // TODO: log the error? + return false + } + return true +} diff --git a/etcdserver/cluster_store_test.go b/etcdserver/cluster_store_test.go new file mode 100644 index 000000000..aeec05e85 --- /dev/null +++ b/etcdserver/cluster_store_test.go @@ -0,0 +1,115 @@ +package etcdserver + +import ( + "reflect" + "testing" + "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" +) + +func TestClusterStoreGet(t *testing.T) { + tests := []struct { + mems []Member + wmems []Member + }{ + { + []Member{{Name: "node1", ID: 1}}, + []Member{{Name: "node1", ID: 1}}, + }, + { + []Member{}, + []Member{}, + }, + { + []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + }, + { + []Member{{Name: "node2", ID: 2}, {Name: "node1", ID: 1}}, + []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + }, + } + for i, tt := range tests { + c := Cluster{} + err := c.AddSlice(tt.mems) + if err != nil { + t.Error(err) + } + + cs := NewClusterStore(&getAllStore{}, c) + + if g := cs.Get(); !reflect.DeepEqual(g, c) { + t.Errorf("#%d: mems = %v, want %v", i, g, c) + } + } +} + +func TestClusterStoreDelete(t *testing.T) { + st := &storeGetAllDeleteRecorder{} + c := Cluster{} + c.Add(Member{Name: "node", ID: 1}) + cs := NewClusterStore(st, c) + cs.Delete(1) + + wdeletes := []string{machineKVPrefix + "1"} + if !reflect.DeepEqual(st.deletes, wdeletes) { + t.Error("deletes = %v, want %v", st.deletes, wdeletes) + } +} + +// simpleStore implements basic create and get. +type simpleStore struct { + storeRecorder + st map[string]string +} + +func (s *simpleStore) Create(key string, _ bool, value string, _ bool, _ time.Time) (*store.Event, error) { + if s.st == nil { + s.st = make(map[string]string) + } + s.st[key] = value + return nil, nil +} +func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) { + val, ok := s.st[key] + if !ok { + return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0) + } + ev := &store.Event{Node: &store.NodeExtern{Key: key, Value: stringp(val)}} + return ev, nil +} + +// getAllStore inherits simpleStore, and makes Get return all keys. +type getAllStore struct { + simpleStore +} + +func (s *getAllStore) Get(_ string, _, _ bool) (*store.Event, error) { + nodes := make([]*store.NodeExtern, 0) + for k, v := range s.st { + nodes = append(nodes, &store.NodeExtern{Key: k, Value: stringp(v)}) + } + return &store.Event{Node: &store.NodeExtern{Nodes: nodes}}, nil +} + +type storeDeleteRecorder struct { + storeRecorder + deletes []string +} + +func (s *storeDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) { + s.deletes = append(s.deletes, key) + return nil, nil +} + +type storeGetAllDeleteRecorder struct { + getAllStore + deletes []string +} + +func (s *storeGetAllDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) { + s.deletes = append(s.deletes, key) + return nil, nil +} diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go new file mode 100644 index 000000000..7ddf6d461 --- /dev/null +++ b/etcdserver/cluster_test.go @@ -0,0 +1,143 @@ +package etcdserver + +import ( + "testing" +) + +func TestClusterFind(t *testing.T) { + tests := []struct { + id int64 + name string + mems []Member + match bool + }{ + { + 1, + "node1", + []Member{{Name: "node1", ID: 1}}, + true, + }, + { + 2, + "foobar", + []Member{}, + false, + }, + { + 2, + "node2", + []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + true, + }, + { + 3, + "node3", + []Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}}, + false, + }, + } + for i, tt := range tests { + c := Cluster{} + c.AddSlice(tt.mems) + + m := c.FindName(tt.name) + if m == nil && !tt.match { + continue + } + if m == nil && tt.match { + t.Errorf("#%d: expected match got empty", i) + } + if m.Name != tt.name && tt.match { + t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.name) + } + } + + for i, tt := range tests { + c := Cluster{} + c.AddSlice(tt.mems) + + m := c.FindID(tt.id) + if m == nil && !tt.match { + continue + } + if m == nil && tt.match { + t.Errorf("#%d: expected match got empty", i) + } + if m.ID != tt.id && tt.match { + t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.id) + } + } +} + +func TestClusterSet(t *testing.T) { + tests := []struct { + f string + mems []Member + parse bool + }{ + { + "mem1=10.0.0.1:2379,mem1=128.193.4.20:2379,mem2=10.0.0.2:2379,default=127.0.0.1:2379", + []Member{ + {ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"10.0.0.1:2379", "128.193.4.20:2379"}}, + {ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"10.0.0.2:2379"}}, + {ID: 2676999861503984872, Name: "default", PeerURLs: []string{"127.0.0.1:2379"}}, + }, + true, + }, + } + for i, tt := range tests { + c := Cluster{} + err := c.AddSlice(tt.mems) + if err != nil { + t.Error(err) + } + + g := Cluster{} + g.Set(tt.f) + + if g.String() != c.String() { + t.Errorf("#%d: set = %v, want %v", i, g, c) + } + } +} + +func TestClusterSetBad(t *testing.T) { + tests := []string{ + "mem1=,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379", + "mem1,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379", + // TODO(philips): anyone know of a 64 bit sha1 hash collision + // "06b2f82fd81b2c20=128.193.4.20:2379,02c60cb75083ceef=128.193.4.20:2379", + } + for i, tt := range tests { + g := Cluster{} + err := g.Set(tt) + if err == nil { + t.Errorf("#%d: set = %v, want err", i, tt) + } + } +} + +func TestClusterAddBad(t *testing.T) { + tests := []struct { + mems []Member + }{ + { + []Member{ + {ID: 1, Name: "mem1"}, + {ID: 1, Name: "mem2"}, + }, + }, + } + + c := &Cluster{} + c.Add(Member{ID: 1, Name: "mem1"}) + + for i, tt := range tests { + for _, m := range tt.mems { + err := c.Add(m) + if err == nil { + t.Errorf("#%d: set = %v, want err", i, m) + } + } + } +} diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index b061a6295..db9e4249f 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -35,12 +35,12 @@ const ( var errClosed = errors.New("etcdhttp: client closed connection") // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. -func NewClientHandler(server *etcdserver.EtcdServer, peers Peers, timeout time.Duration) http.Handler { +func NewClientHandler(server *etcdserver.EtcdServer, clusterStore etcdserver.ClusterStore, timeout time.Duration) http.Handler { sh := &serverHandler{ - server: server, - peers: peers, - timer: server, - timeout: timeout, + server: server, + clusterStore: clusterStore, + timer: server, + timeout: timeout, } if sh.timeout == 0 { sh.timeout = defaultServerTimeout @@ -68,10 +68,10 @@ func NewPeerHandler(server etcdserver.Server) http.Handler { // serverHandler provides http.Handlers for etcd client and raft communication. type serverHandler struct { - timeout time.Duration - server etcdserver.Server - timer etcdserver.RaftTimer - peers Peers + timeout time.Duration + server etcdserver.Server + timer etcdserver.RaftTimer + clusterStore etcdserver.ClusterStore } func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { @@ -115,7 +115,7 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "HEAD") { return } - endpoints := h.peers.Endpoints() + endpoints := h.clusterStore.Get().Endpoints() w.Write([]byte(strings.Join(endpoints, ", "))) } diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 8667567cc..7e60e53bb 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -589,7 +589,7 @@ func TestV2MachinesEndpoint(t *testing.T) { {"POST", http.StatusMethodNotAllowed}, } - m := NewClientHandler(nil, Peers{}, time.Hour) + m := NewClientHandler(nil, &fakeCluster{}, time.Hour) s := httptest.NewServer(m) defer s.Close() @@ -610,15 +610,20 @@ func TestV2MachinesEndpoint(t *testing.T) { } func TestServeMachines(t *testing.T) { - peers := Peers{} - peers.Set("0xBEEF0=localhost:8080&0xBEEF1=localhost:8081&0xBEEF2=localhost:8082") + cluster := &fakeCluster{ + members: []etcdserver.Member{ + {ID: 0xBEEF0, PeerURLs: []string{"localhost:8080"}}, + {ID: 0xBEEF1, PeerURLs: []string{"localhost:8081"}}, + {ID: 0xBEEF2, PeerURLs: []string{"localhost:8082"}}, + }, + } writer := httptest.NewRecorder() req, err := http.NewRequest("GET", "", nil) if err != nil { t.Fatal(err) } - h := &serverHandler{peers: peers} + h := &serverHandler{clusterStore: cluster} h.serveMachines(writer, req) w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" if g := writer.Body.String(); g != w { @@ -629,56 +634,64 @@ func TestServeMachines(t *testing.T) { } } -func TestPeersEndpoints(t *testing.T) { +func TestClusterGetEndpoints(t *testing.T) { tests := []struct { - peers Peers - endpoints []string + clusterStore etcdserver.ClusterStore + endpoints []string }{ // single peer with a single address { - peers: Peers(map[int64][]string{ - 1: []string{"192.0.2.1"}, - }), + clusterStore: &fakeCluster{ + members: []etcdserver.Member{ + {ID: 1, PeerURLs: []string{"192.0.2.1"}}, + }, + }, endpoints: []string{"http://192.0.2.1"}, }, // single peer with a single address with a port { - peers: Peers(map[int64][]string{ - 1: []string{"192.0.2.1:8001"}, - }), + clusterStore: &fakeCluster{ + members: []etcdserver.Member{ + {ID: 1, PeerURLs: []string{"192.0.2.1:8001"}}, + }, + }, endpoints: []string{"http://192.0.2.1:8001"}, }, - // several peers explicitly unsorted + // several members explicitly unsorted { - peers: Peers(map[int64][]string{ - 2: []string{"192.0.2.3", "192.0.2.4"}, - 3: []string{"192.0.2.5", "192.0.2.6"}, - 1: []string{"192.0.2.1", "192.0.2.2"}, - }), + clusterStore: &fakeCluster{ + members: []etcdserver.Member{ + {ID: 2, PeerURLs: []string{"192.0.2.3", "192.0.2.4"}}, + {ID: 3, PeerURLs: []string{"192.0.2.5", "192.0.2.6"}}, + {ID: 1, PeerURLs: []string{"192.0.2.1", "192.0.2.2"}}, + }, + }, endpoints: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"}, }, - // no peers + // no members { - peers: Peers(map[int64][]string{}), - endpoints: []string{}, + clusterStore: &fakeCluster{members: []etcdserver.Member{}}, + endpoints: []string{}, }, // peer with no endpoints { - peers: Peers(map[int64][]string{ - 3: []string{}, - }), + clusterStore: &fakeCluster{ + members: []etcdserver.Member{ + {ID: 3, PeerURLs: []string{}}, + }, + }, endpoints: []string{}, }, } for i, tt := range tests { - endpoints := tt.peers.Endpoints() + endpoints := tt.clusterStore.Get().Endpoints() if !reflect.DeepEqual(tt.endpoints, endpoints) { - t.Errorf("#%d: peers.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints) + t.Errorf("#%d: members.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints) } } } @@ -868,7 +881,6 @@ func TestServeRaft(t *testing.T) { h := &serverHandler{ timeout: time.Hour, server: &errServer{tt.serverErr}, - peers: nil, } rw := httptest.NewRecorder() h.serveRaft(rw, req) @@ -957,7 +969,6 @@ func TestBadServeKeys(t *testing.T) { h := &serverHandler{ timeout: 0, // context times out immediately server: tt.server, - peers: nil, } rw := httptest.NewRecorder() h.serveKeys(rw, tt.req) @@ -980,7 +991,6 @@ func TestServeKeysEvent(t *testing.T) { h := &serverHandler{ timeout: time.Hour, server: server, - peers: nil, timer: &dummyRaftTimer{}, } rw := httptest.NewRecorder() @@ -1019,7 +1029,6 @@ func TestServeKeysWatch(t *testing.T) { h := &serverHandler{ timeout: time.Hour, server: server, - peers: nil, timer: &dummyRaftTimer{}, } go func() { @@ -1295,3 +1304,15 @@ func TestHandleWatchStreaming(t *testing.T) { t.Fatalf("timed out waiting for done") } } + +type fakeCluster struct { + members []etcdserver.Member +} + +func (c *fakeCluster) Get() etcdserver.Cluster { + cl := &etcdserver.Cluster{} + cl.AddSlice(c.members) + return *cl +} + +func (c *fakeCluster) Delete(id int64) { return } diff --git a/etcdserver/etcdhttp/peers.go b/etcdserver/etcdhttp/peers.go deleted file mode 100644 index 5d406a5a8..000000000 --- a/etcdserver/etcdhttp/peers.go +++ /dev/null @@ -1,157 +0,0 @@ -package etcdhttp - -import ( - "bytes" - "fmt" - "log" - "math/rand" - "net/http" - "net/url" - "sort" - "strconv" - - "github.com/coreos/etcd/raft/raftpb" -) - -// Peers contains a mapping of unique IDs to a list of hostnames/IP addresses -type Peers map[int64][]string - -// addScheme adds the protocol prefix to a string; currently only HTTP -// TODO: improve this when implementing TLS -func addScheme(addr string) string { - return fmt.Sprintf("http://%s", addr) -} - -// Pick returns a random address from a given Peer's addresses. If the -// given peer does not exist, an empty string is returned. -func (ps Peers) Pick(id int64) string { - addrs := ps[id] - if len(addrs) == 0 { - return "" - } - return addrs[rand.Intn(len(addrs))] -} - -// Set parses command line sets of names to IPs formatted like: -// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2 -func (ps *Peers) Set(s string) error { - m := make(map[int64][]string) - v, err := url.ParseQuery(s) - if err != nil { - return err - } - for k, v := range v { - id, err := strconv.ParseInt(k, 0, 64) - if err != nil { - return err - } - m[id] = v - } - *ps = m - return nil -} - -func (ps *Peers) String() string { - v := url.Values{} - for k, vv := range *ps { - for i := range vv { - v.Add(strconv.FormatInt(k, 16), vv[i]) - } - } - return v.Encode() -} - -func (ps Peers) IDs() []int64 { - var ids []int64 - for id := range ps { - ids = append(ids, id) - } - return ids -} - -// Endpoints returns a list of all peer addresses. Each address is prefixed -// with the scheme (currently "http://"). The returned list is sorted in -// ascending lexicographical order. -func (ps Peers) Endpoints() []string { - endpoints := make([]string, 0) - for _, addrs := range ps { - for _, addr := range addrs { - endpoints = append(endpoints, addScheme(addr)) - } - } - sort.Strings(endpoints) - - return endpoints -} - -// Addrs returns a list of all peer addresses. The returned list is sorted -// in ascending lexicographical order. -func (ps Peers) Addrs() []string { - addrs := make([]string, 0) - for _, paddrs := range ps { - for _, paddr := range paddrs { - addrs = append(addrs, paddr) - } - } - sort.Strings(addrs) - return addrs -} - -func Sender(t *http.Transport, p Peers) func(msgs []raftpb.Message) { - c := &http.Client{Transport: t} - - scheme := "http" - if t.TLSClientConfig != nil { - scheme = "https" - } - - return func(msgs []raftpb.Message) { - for _, m := range msgs { - // TODO: reuse go routines - // limit the number of outgoing connections for the same receiver - go send(c, scheme, p, m) - } - } -} - -func send(c *http.Client, scheme string, p Peers, m raftpb.Message) { - // TODO (xiangli): reasonable retry logic - for i := 0; i < 3; i++ { - addr := p.Pick(m.To) - if addr == "" { - // TODO: unknown peer id.. what do we do? I - // don't think his should ever happen, need to - // look into this further. - log.Printf("etcdhttp: no addr for %d", m.To) - return - } - - url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix) - - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("etcdhttp: dropping message:", err) - return // drop bad message - } - if httpPost(c, url, data) { - return // success - } - // TODO: backoff - } -} - -func httpPost(c *http.Client, url string, data []byte) bool { - resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data)) - if err != nil { - // TODO: log the error? - return false - } - resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - // TODO: log the error? - return false - } - return true -} diff --git a/etcdserver/etcdhttp/peers_test.go b/etcdserver/etcdhttp/peers_test.go deleted file mode 100644 index c4f7f04ff..000000000 --- a/etcdserver/etcdhttp/peers_test.go +++ /dev/null @@ -1,248 +0,0 @@ -package etcdhttp - -import ( - "net/http" - "net/http/httptest" - "reflect" - "sort" - "strings" - "testing" - - "github.com/coreos/etcd/raft/raftpb" -) - -func TestPeers(t *testing.T) { - tests := []struct { - in string - wids []int64 - wep []string - waddrs []string - wstring string - }{ - { - "1=1.1.1.1", - []int64{1}, - []string{"http://1.1.1.1"}, - []string{"1.1.1.1"}, - "1=1.1.1.1", - }, - { - "2=2.2.2.2", - []int64{2}, - []string{"http://2.2.2.2"}, - []string{"2.2.2.2"}, - "2=2.2.2.2", - }, - { - "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", - []int64{1, 2}, - []string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2"}, - []string{"1.1.1.1", "1.1.1.2", "2.2.2.2"}, - "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", - }, - { - "3=3.3.3.3&4=4.4.4.4&1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", - []int64{1, 2, 3, 4}, - []string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2", - "http://3.3.3.3", "http://4.4.4.4"}, - []string{"1.1.1.1", "1.1.1.2", "2.2.2.2", "3.3.3.3", "4.4.4.4"}, - "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2&3=3.3.3.3&4=4.4.4.4", - }, - } - for i, tt := range tests { - p := &Peers{} - err := p.Set(tt.in) - if err != nil { - t.Errorf("#%d: err=%v, want nil", i, err) - } - ids := int64Slice(p.IDs()) - sort.Sort(ids) - if !reflect.DeepEqual([]int64(ids), tt.wids) { - t.Errorf("#%d: IDs=%#v, want %#v", i, []int64(ids), tt.wids) - } - ep := p.Endpoints() - if !reflect.DeepEqual(ep, tt.wep) { - t.Errorf("#%d: Endpoints=%#v, want %#v", i, ep, tt.wep) - } - addrs := p.Addrs() - if !reflect.DeepEqual(addrs, tt.waddrs) { - t.Errorf("#%d: addrs=%#v, want %#v", i, ep, tt.waddrs) - } - s := p.String() - if s != tt.wstring { - t.Errorf("#%d: string=%q, want %q", i, s, tt.wstring) - } - } -} - -type int64Slice []int64 - -func (p int64Slice) Len() int { return len(p) } -func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func TestPeersSetBad(t *testing.T) { - tests := []string{ - // garbage URL - "asdf%%", - // non-int64 keys - "a=1.2.3.4", - "-1-23=1.2.3.4", - } - for i, tt := range tests { - p := &Peers{} - if err := p.Set(tt); err == nil { - t.Errorf("#%d: err=nil unexpectedly", i) - } - } -} - -func TestPeersPick(t *testing.T) { - ps := &Peers{ - 1: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, - 2: []string{"xyz"}, - 3: []string{}, - } - ids := map[string]bool{ - "abc": true, - "def": true, - "ghi": true, - "jkl": true, - "mno": true, - "pqr": true, - "stu": true, - } - for i := 0; i < 1000; i++ { - a := ps.Pick(1) - if _, ok := ids[a]; !ok { - t.Errorf("returned ID %q not in expected range!", a) - break - } - } - if b := ps.Pick(2); b != "xyz" { - t.Errorf("id=%q, want %q", b, "xyz") - } - if c := ps.Pick(3); c != "" { - t.Errorf("id=%q, want \"\"", c) - } -} - -func TestHttpPost(t *testing.T) { - var tr *http.Request - tests := []struct { - h http.HandlerFunc - w bool - }{ - { - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tr = r - w.WriteHeader(http.StatusNoContent) - }), - true, - }, - { - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tr = r - w.WriteHeader(http.StatusNotFound) - }), - false, - }, - { - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tr = r - w.WriteHeader(http.StatusInternalServerError) - }), - false, - }, - } - for i, tt := range tests { - ts := httptest.NewServer(tt.h) - if g := httpPost(http.DefaultClient, ts.URL, []byte("adsf")); g != tt.w { - t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w) - } - if tr.Method != "POST" { - t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST") - } - if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" { - t.Errorf("#%d: Content-Type=%q, want %q", i, ct, "application/protobuf") - } - tr = nil - ts.Close() - } - - if httpPost(http.DefaultClient, "garbage url", []byte("data")) { - t.Errorf("httpPost with bad URL returned true unexpectedly!") - } -} - -func TestSend(t *testing.T) { - var tr *http.Request - var rc int - h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tr = r - w.WriteHeader(rc) - }) - tests := []struct { - m raftpb.Message - code int - - ok bool - }{ - { - // all good - raftpb.Message{ - To: 42, - Type: 4, - }, - http.StatusNoContent, - true, - }, - { - // bad response from server should be silently ignored - raftpb.Message{ - To: 42, - Type: 2, - }, - http.StatusInternalServerError, - true, - }, - { - // unknown destination! - raftpb.Message{ - To: 3, - Type: 2, - }, - 0, - false, - }, - } - - for i, tt := range tests { - tr = nil - rc = tt.code - ts := httptest.NewServer(h) - ps := Peers{ - 42: []string{strings.TrimPrefix(ts.URL, "http://")}, - } - send(http.DefaultClient, "http", ps, tt.m) - - if !tt.ok { - if tr != nil { - t.Errorf("#%d: got request=%#v, want nil", i, tr) - } - ts.Close() - continue - } - - if tr.Method != "POST" { - t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST") - } - if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" { - t.Errorf("#%d: Content-Type=%q, want %q", i, ct, "application/protobuf") - } - if tr.URL.String() != "/raft" { - t.Errorf("#%d: URL=%q, want %q", i, tr.URL.String(), "/raft") - } - ts.Close() - } -} diff --git a/etcdserver/member.go b/etcdserver/member.go new file mode 100644 index 000000000..f54820c00 --- /dev/null +++ b/etcdserver/member.go @@ -0,0 +1,43 @@ +package etcdserver + +import ( + "crypto/sha1" + "encoding/binary" + "path" + "sort" + "strconv" +) + +const machineKVPrefix = "/_etcd/machines/" + +type Member struct { + ID int64 + Name string + // TODO(philips): ensure these are URLs + PeerURLs []string + ClientURLs []string +} + +// newMember creates a Member without an ID and generates one based on the +// name, peer URLs. This is used for bootstrapping. +func newMember(name string, peerURLs []string) *Member { + sort.Strings(peerURLs) + m := &Member{Name: name, PeerURLs: peerURLs} + + b := []byte(m.Name) + for _, p := range m.PeerURLs { + b = append(b, []byte(p)...) + } + + hash := sha1.Sum(b) + m.ID = int64(binary.BigEndian.Uint64(hash[:8])) + if m.ID < 0 { + m.ID = m.ID * -1 + } + + return m +} + +func (m Member) storeKey() string { + return path.Join(machineKVPrefix, strconv.FormatUint(uint64(m.ID), 16)) +} diff --git a/etcdserver/server.go b/etcdserver/server.go index a20d05030..8a53a317f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -80,7 +80,7 @@ type EtcdServer struct { Node raft.Node Store store.Store - // Send specifies the send function for sending msgs to peers. Send + // Send specifies the send function for sending msgs to members. Send // MUST NOT block. It is okay to drop messages, since clients should // timeout and reissue their messages. If Send is nil, server will // panic. @@ -94,8 +94,9 @@ type EtcdServer struct { SnapCount int64 // number of entries to trigger a snapshot // Cache of the latest raft index and raft term the server has seen - raftIndex int64 - raftTerm int64 + raftIndex int64 + raftTerm int64 + ClusterStore ClusterStore } // Start prepares and starts server in a new goroutine. It is no longer safe to @@ -107,6 +108,8 @@ func (s *EtcdServer) Start() { } s.w = wait.New() s.done = make(chan struct{}) + // TODO: if this is an empty log, writes all peer infos + // into the first entry go s.run() } @@ -130,6 +133,7 @@ func (s *EtcdServer) run() { // TODO(bmizerany): do this in the background, but take // care to apply entries in a single goroutine, and not // race them. + // TODO: apply configuration change into ClusterStore. for _, e := range rd.CommittedEntries { switch e.Type { case raftpb.EntryNormal: diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 90615da6e..337cfc98a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -382,14 +382,14 @@ func testServer(t *testing.T, ns int64) { } } - peers := make([]int64, ns) + members := make([]int64, ns) for i := int64(0); i < ns; i++ { - peers[i] = i + 1 + members[i] = i + 1 } for i := int64(0); i < ns; i++ { id := i + 1 - n := raft.StartNode(id, peers, 10, 1) + n := raft.StartNode(id, members, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ diff --git a/main.go b/main.go index 2992f2c74..3e136941a 100644 --- a/main.go +++ b/main.go @@ -36,14 +36,14 @@ const ( ) var ( - fid = flag.String("id", "0x1", "ID of this server") + name = flag.String("name", "default", "Unique human-readable name for this node") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") paddr = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')") dir = flag.String("data-dir", "", "Path to the data directory") snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = flag.Bool("version", false, "Print the version and exit") - peers = &etcdhttp.Peers{} + cluster = &etcdserver.Cluster{} addrs = &Addrs{} cors = &pkg.CORSInfo{} proxyFlag = new(ProxyFlag) @@ -78,11 +78,11 @@ var ( ) func init() { - flag.Var(peers, "peers", "your peers") + flag.Var(cluster, "bootstrap-config", "Initial cluster configuration for bootstrapping") flag.Var(addrs, "bind-addr", "List of HTTP service addresses (e.g., '127.0.0.1:4001,10.0.0.1:8080')") flag.Var(cors, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).") flag.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(proxyFlagValues, ", "))) - peers.Set("0x1=localhost:8080") + cluster.Set("default=localhost:8080") addrs.Set("127.0.0.1:4001") proxyFlag.Set(proxyFlagValueOff) @@ -122,16 +122,13 @@ func main() { // startEtcd launches the etcd server and HTTP handlers for client/server communication. func startEtcd() { - id, err := strconv.ParseInt(*fid, 0, 64) - if err != nil { - log.Fatal(err) - } - if id == raft.None { - log.Fatalf("etcd: cannot use None(%d) as etcdserver id", raft.None) + self := cluster.FindName(*name) + if self == nil { + log.Fatalf("etcd: no member with name=%q exists", *name) } - if peers.Pick(id) == "" { - log.Fatalf("%#x= must be specified in peers", id) + if self.ID == raft.None { + log.Fatalf("etcd: cannot use None(%d) as member id", raft.None) } if *snapCount <= 0 { @@ -139,7 +136,7 @@ func startEtcd() { } if *dir == "" { - *dir = fmt.Sprintf("%v_etcd_data", *fid) + *dir = fmt.Sprintf("%v_etcd_data", self.ID) log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, privateDirMode); err != nil { @@ -154,6 +151,7 @@ func startEtcd() { waldir := path.Join(*dir, "wal") var w *wal.WAL var n raft.Node + var err error st := store.New() if !wal.Exist(waldir) { @@ -161,7 +159,7 @@ func startEtcd() { if err != nil { log.Fatal(err) } - n = raft.StartNode(id, peers.IDs(), 10, 1) + n = raft.StartNode(self.ID, cluster.IDs(), 10, 1) } else { var index int64 snapshot, err := snapshotter.Load() @@ -186,7 +184,7 @@ func startEtcd() { if wid != 0 { log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) } - n = raft.RestartNode(id, peers.IDs(), 10, 1, snapshot, st, ents) + n = raft.RestartNode(self.ID, cluster.IDs(), 10, 1, snapshot, st, ents) } pt, err := transport.NewTransport(peerTLSInfo) @@ -194,6 +192,8 @@ func startEtcd() { log.Fatal(err) } + cls := etcdserver.NewClusterStore(st, *cluster) + s := &etcdserver.EtcdServer{ Store: st, Node: n, @@ -201,15 +201,16 @@ func startEtcd() { *wal.WAL *snap.Snapshotter }{w, snapshotter}, - Send: etcdhttp.Sender(pt, *peers), - Ticker: time.Tick(100 * time.Millisecond), - SyncTicker: time.Tick(500 * time.Millisecond), - SnapCount: *snapCount, + Send: etcdserver.Sender(pt, cls), + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), + SnapCount: *snapCount, + ClusterStore: cls, } s.Start() ch := &pkg.CORSHandler{ - Handler: etcdhttp.NewClientHandler(s, *peers, *timeout), + Handler: etcdhttp.NewClientHandler(s, cls, *timeout), Info: cors, } ph := etcdhttp.NewPeerHandler(s) @@ -247,7 +248,7 @@ func startProxy() { log.Fatal(err) } - ph, err := proxy.NewHandler(pt, (*peers).Addrs()) + ph, err := proxy.NewHandler(pt, (*cluster).Endpoints()) if err != nil { log.Fatal(err) }