diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index fc1288dad..4a67f270d 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -1,7 +1,6 @@ package etcdhttp import ( - "bytes" "encoding/binary" "encoding/json" "errors" @@ -11,13 +10,11 @@ import ( "log" "net/http" "net/url" - "sort" "strconv" "strings" "time" crand "crypto/rand" - "math/rand" "github.com/coreos/etcd/elog" etcdErr "github.com/coreos/etcd/error" @@ -31,131 +28,12 @@ import ( const ( keysPrefix = "/v2/keys" machinesPrefix = "/v2/machines" + + DefaultTimeout = 500 * time.Millisecond ) -type Peers map[int64][]string - -func (ps Peers) Pick(id int64) string { - addrs := ps[id] - if len(addrs) == 0 { - return "" - } - return addScheme(addrs[rand.Intn(len(addrs))]) -} - -// TODO: improve this when implementing TLS -func addScheme(addr string) string { - return fmt.Sprintf("http://%s", addr) -} - -// Set parses command line sets of names to ips formatted like: -// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2 -func (ps *Peers) Set(s string) error { - m := make(map[int64][]string) - v, err := url.ParseQuery(s) - if err != nil { - return err - } - for k, v := range v { - id, err := strconv.ParseInt(k, 0, 64) - if err != nil { - return err - } - m[id] = v - } - *ps = m - return nil -} - -func (ps *Peers) String() string { - v := url.Values{} - for k, vv := range *ps { - for i := range vv { - v.Add(strconv.FormatInt(k, 16), vv[i]) - } - } - return v.Encode() -} - -func (ps Peers) IDs() []int64 { - var ids []int64 - for id := range ps { - ids = append(ids, id) - } - return ids -} - -// Endpoints returns a list of all peer addresses. Each address is -// prefixed with "http://". The returned list is sorted (asc). -func (ps Peers) Endpoints() []string { - endpoints := make([]string, 0) - for _, addrs := range ps { - for _, addr := range addrs { - endpoints = append(endpoints, addScheme(addr)) - } - } - sort.Strings(endpoints) - - return endpoints -} - var errClosed = errors.New("etcdhttp: client closed connection") -const DefaultTimeout = 500 * time.Millisecond - -func Sender(p Peers) func(msgs []raftpb.Message) { - return func(msgs []raftpb.Message) { - for _, m := range msgs { - // TODO: reuse go routines - // limit the number of outgoing connections for the same receiver - go send(p, m) - } - } -} - -func send(p Peers, m raftpb.Message) { - // TODO (xiangli): reasonable retry logic - for i := 0; i < 3; i++ { - url := p.Pick(m.To) - if url == "" { - // TODO: unknown peer id.. what do we do? I - // don't think his should ever happen, need to - // look into this further. - log.Println("etcdhttp: no addr for %d", m.To) - return - } - - url += "/raft" - - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("etcdhttp: dropping message:", err) - return // drop bad message - } - if httpPost(url, data) { - return // success - } - // TODO: backoff - } -} - -func httpPost(url string, data []byte) bool { - // TODO: set timeouts - resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data)) - if err != nil { - elog.TODO() - return false - } - resp.Body.Close() - if resp.StatusCode != 200 { - elog.TODO() - return false - } - return true -} - // Handler implements the http.Handler interface and serves etcd client and // raft communication. type Handler struct { diff --git a/etcdserver/etcdhttp/peers.go b/etcdserver/etcdhttp/peers.go new file mode 100644 index 000000000..bba16fc88 --- /dev/null +++ b/etcdserver/etcdhttp/peers.go @@ -0,0 +1,139 @@ +package etcdhttp + +import ( + "bytes" + "fmt" + "log" + "math/rand" + "net/http" + "net/url" + "sort" + "strconv" + + "github.com/coreos/etcd/elog" + "github.com/coreos/etcd/raft/raftpb" +) + +// Peers contains a mapping of unique IDs to a list of hostnames/IP addresses +type Peers map[int64][]string + +// addScheme adds the protocol prefix to a string; currently only HTTP +// TODO: improve this when implementing TLS +func addScheme(addr string) string { + return fmt.Sprintf("http://%s", addr) +} + +// Pick chooses a random address from a given Peer's addresses, and returns it as +// an addressible URI. If the given peer does not exist, an empty string is returned. +func (ps Peers) Pick(id int64) string { + addrs := ps[id] + if len(addrs) == 0 { + return "" + } + return addScheme(addrs[rand.Intn(len(addrs))]) +} + +// Set parses command line sets of names to IPs formatted like: +// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2 +func (ps *Peers) Set(s string) error { + m := make(map[int64][]string) + v, err := url.ParseQuery(s) + if err != nil { + return err + } + for k, v := range v { + id, err := strconv.ParseInt(k, 0, 64) + if err != nil { + return err + } + m[id] = v + } + *ps = m + return nil +} + +func (ps *Peers) String() string { + v := url.Values{} + for k, vv := range *ps { + for i := range vv { + v.Add(strconv.FormatInt(k, 16), vv[i]) + } + } + return v.Encode() +} + +func (ps Peers) IDs() []int64 { + var ids []int64 + for id := range ps { + ids = append(ids, id) + } + return ids +} + +// Endpoints returns a list of all peer addresses. Each address is prefixed +// with the scheme (currently "http://"). The returned list is sorted in +// ascending lexicographical order. +func (ps Peers) Endpoints() []string { + endpoints := make([]string, 0) + for _, addrs := range ps { + for _, addr := range addrs { + endpoints = append(endpoints, addScheme(addr)) + } + } + sort.Strings(endpoints) + + return endpoints +} + +func Sender(p Peers) func(msgs []raftpb.Message) { + return func(msgs []raftpb.Message) { + for _, m := range msgs { + // TODO: reuse go routines + // limit the number of outgoing connections for the same receiver + go send(p, m) + } + } +} + +func send(p Peers, m raftpb.Message) { + // TODO (xiangli): reasonable retry logic + for i := 0; i < 3; i++ { + url := p.Pick(m.To) + if url == "" { + // TODO: unknown peer id.. what do we do? I + // don't think his should ever happen, need to + // look into this further. + log.Println("etcdhttp: no addr for %d", m.To) + return + } + + url += "/raft" + + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data, err := m.Marshal() + if err != nil { + log.Println("etcdhttp: dropping message:", err) + return // drop bad message + } + if httpPost(url, data) { + return // success + } + // TODO: backoff + } +} + +func httpPost(url string, data []byte) bool { + // TODO: set timeouts + resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data)) + if err != nil { + elog.TODO() + return false + } + resp.Body.Close() + if resp.StatusCode != 200 { + elog.TODO() + return false + } + return true +} diff --git a/etcdserver/etcdhttp/peers_test.go b/etcdserver/etcdhttp/peers_test.go index 83bc6fa31..4942fd006 100644 --- a/etcdserver/etcdhttp/peers_test.go +++ b/etcdserver/etcdhttp/peers_test.go @@ -1,19 +1,157 @@ package etcdhttp -import "testing" +import ( + "net/http" + "net/http/httptest" + "reflect" + "sort" + "testing" +) -//TODO: full testing for peer set -func TestPeerSet(t *testing.T) { - p := &Peers{} - tests := []string{ - "1=1.1.1.1", - "2=2.2.2.2", - "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", +func TestPeers(t *testing.T) { + tests := []struct { + in string + wids []int64 + wep []string + wstring string + }{ + { + "1=1.1.1.1", + []int64{1}, + []string{"http://1.1.1.1"}, + "1=1.1.1.1", + }, + { + "2=2.2.2.2", + []int64{2}, + []string{"http://2.2.2.2"}, + "2=2.2.2.2", + }, + { + "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", + []int64{1, 2}, + []string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2"}, + "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", + }, + { + "3=3.3.3.3&4=4.4.4.4&1=1.1.1.1&1=1.1.1.2&2=2.2.2.2", + []int64{1, 2, 3, 4}, + []string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2", + "http://3.3.3.3", "http://4.4.4.4"}, + "1=1.1.1.1&1=1.1.1.2&2=2.2.2.2&3=3.3.3.3&4=4.4.4.4", + }, } for i, tt := range tests { - p.Set(tt) - if p.String() != tt { - t.Errorf("#%d: string = %s, want %s", i, p.String(), tt) + p := &Peers{} + err := p.Set(tt.in) + if err != nil { + t.Errorf("#%d: err=%v, want nil", i, err) + } + ids := int64Slice(p.IDs()) + sort.Sort(ids) + if !reflect.DeepEqual([]int64(ids), tt.wids) { + t.Errorf("#%d: IDs=%#v, want %#v", i, []int64(ids), tt.wids) + } + ep := p.Endpoints() + if !reflect.DeepEqual(ep, tt.wep) { + t.Errorf("#%d: Endpoints=%#v, want %#v", i, ep, tt.wep) + } + s := p.String() + if s != tt.wstring { + t.Errorf("#%d: string=%q, want %q", i, s, tt.wstring) } } } + +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func TestPeersSetBad(t *testing.T) { + tests := []string{ + // garbage URL + "asdf%%", + // non-int64 keys + "a=1.2.3.4", + "-1-23=1.2.3.4", + } + for i, tt := range tests { + p := &Peers{} + if err := p.Set(tt); err == nil { + t.Errorf("#%d: err=nil unexpectedly", i) + } + } +} + +func TestPeersPick(t *testing.T) { + ps := &Peers{ + 1: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, + 2: []string{"xyz"}, + 3: []string{}, + } + ids := map[string]bool{ + "http://abc": true, + "http://def": true, + "http://ghi": true, + "http://jkl": true, + "http://mno": true, + "http://pqr": true, + "http://stu": true, + } + for i := 0; i < 1000; i++ { + a := ps.Pick(1) + if _, ok := ids[a]; !ok { + t.Errorf("returned ID %q not in expected range!", a) + break + } + } + if b := ps.Pick(2); b != "http://xyz" { + t.Errorf("id=%q, want %q", b, "http://xyz") + } + if c := ps.Pick(3); c != "" { + t.Errorf("id=%q, want \"\"", c) + } +} + +func TestHttpPost(t *testing.T) { + var tr *http.Request + tests := []struct { + h http.HandlerFunc + w bool + }{ + { + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tr = r + w.WriteHeader(200) + }), + true, + }, + { + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tr = r + w.WriteHeader(404) + }), + false, + }, + } + for i, tt := range tests { + ts := httptest.NewServer(tt.h) + if g := httpPost(ts.URL, []byte("adsf")); g != tt.w { + t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w) + } + if tr.Method != "POST" { + t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST") + } + if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" { + t.Errorf("%#d: Content-Type=%q, want %q", ct, "application/protobuf") + } + tr = nil + ts.Close() + } + + if httpPost("garbage url", []byte("data")) { + t.Errorf("httpPost with bad URL returned true unexpectedly!") + } +}