From cbec48e8f6bb1da4f9fce9127c29a60438338de3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 5 Sep 2014 13:53:49 -0700 Subject: [PATCH] etcdhttp: non-blocking sender --- etcdserver/etcdhttp/http.go | 59 ++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 6763d985f..713c60b33 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -73,37 +73,41 @@ const DefaultTimeout = 500 * time.Millisecond func Sender(p Peers) func(msgs []raftpb.Message) { return func(msgs []raftpb.Message) { for _, m := range msgs { - // TODO: create workers that deal with message sending - // concurrently as to not block progress - for { - url := p.Pick(m.To) - if url == "" { - // TODO: unknown peer id.. what do we do? I - // don't think his should ever happen, need to - // look into this further. - log.Println("etcdhttp: no addr for %d", m.To) - break - } - - url += "/raft" - - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("etcdhttp: dropping message:", err) - break // drop bad message - } - if httpPost(url, data) { - break // success - } - - // TODO: backoff - } + // TODO: reuse go routines + // limit the number of outgoing connections for the same receiver + go send(p, m) } } } +func send(p Peers, m raftpb.Message) { + // TODO (xiangli): reasonable retry logic + for i := 0; i < 3; i++ { + url := p.Pick(m.To) + if url == "" { + // TODO: unknown peer id.. what do we do? I + // don't think his should ever happen, need to + // look into this further. + log.Println("etcdhttp: no addr for %d", m.To) + return + } + + url += "/raft" + + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data, err := m.Marshal() + if err != nil { + log.Println("etcdhttp: dropping message:", err) + return // drop bad message + } + if httpPost(url, data) { + return // success + } + // TODO: backoff + } +} + func httpPost(url string, data []byte) bool { // TODO: set timeouts resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data)) @@ -111,6 +115,7 @@ func httpPost(url string, data []byte) bool { elog.TODO() return false } + resp.Body.Close() if resp.StatusCode != 200 { elog.TODO() return false