From fc9fc3f88896793b92031786bd3c92ab86236f6b Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Sun, 31 Aug 2014 19:56:02 -0700 Subject: [PATCH] ... --- etcdserver2/etcdhttp/http.go | 58 ++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/etcdserver2/etcdhttp/http.go b/etcdserver2/etcdhttp/http.go index 692a57520..bb1faf2ad 100644 --- a/etcdserver2/etcdhttp/http.go +++ b/etcdserver2/etcdhttp/http.go @@ -1,6 +1,7 @@ package etcdhttp import ( + "bytes" "encoding/binary" "encoding/json" "errors" @@ -13,7 +14,8 @@ import ( "strings" "time" - "crypto/rand" + crand "crypto/rand" + "math/rand" "code.google.com/p/go.net/context" "github.com/coreos/etcd/elog" etcdserver "github.com/coreos/etcd/etcdserver2" @@ -22,10 +24,62 @@ import ( "github.com/coreos/etcd/store" ) +type Peers map[int64][]string + +func (ps Peers) Pick(id int64) string { + addrs := ps[id] + return addrs[rand.Intn(len(addrs))] +} + var errClosed = errors.New("etcdhttp: client closed connection") const DefaultTimeout = 500 * time.Millisecond +func Sender(prefix string, p Peers) func(msgs []raftpb.Message) { + return func(msgs []raftpb.Message) { + for _, m := range msgs { + // TODO: create workers that deal with message sending + // concurrently as to not block progress + for { + url := p.Pick(m.To) + if url == "" { + // TODO: unknown peer id.. what do we do? I + // don't think his should ever happen, need to + // look into this further. + elog.TODO() + } + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data, err := m.Marshal() + if err != nil { + elog.TODO() + break // drop bad message + } + if httpPost(url+prefix, data) { + break // success + } + + // TODO: backoff + } + } + } +} + +func httpPost(url string, data []byte) bool { + resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data)) + if err != nil { + elog.TODO() + return false + } + if resp.StatusCode != 200 { + elog.TODO() + return false + } + return true +} + +// Handler implements the http.Handler interface and serves etcd client and +// raft communication. type Handler struct { Timeout time.Duration Server *etcdserver.Server @@ -90,7 +144,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R func genId() int64 { for { b := make([]byte, 8) - if _, err := io.ReadFull(rand.Reader, b); err != nil { + if _, err := io.ReadFull(crand.Reader, b); err != nil { panic(err) // really bad stuff happened } n := int64(binary.BigEndian.Uint64(b))