diff --git a/raft_server.go b/raft_server.go index aeaa377a4..c123d5c80 100644 --- a/raft_server.go +++ b/raft_server.go @@ -30,7 +30,7 @@ var r *raftServer func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raft.DefaultElectionTimeout) + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout) // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil) @@ -169,7 +169,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { // getVersion fetches the raft version of a peer. This works for now but we // will need to do something more sophisticated later when we allow mixed // version clusters. -func getVersion(t transporter, versionURL url.URL) (string, error) { +func getVersion(t *transporter, versionURL url.URL) (string, error) { resp, err := t.Get(versionURL.String()) if err != nil { @@ -198,6 +198,7 @@ func joinCluster(cluster []string) bool { if _, ok := err.(etcdErr.Error); ok { fatal(err) } + debugf("cannot join to cluster via machine %s %s", machine, err) } } @@ -209,7 +210,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { var b bytes.Buffer // t must be ok - t, _ := r.Transporter().(transporter) + t, _ := r.Transporter().(*transporter) // Our version must match the leaders version versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} diff --git a/transporter.go b/transporter.go index 66a179791..0a871ad09 100644 --- a/transporter.go +++ b/transporter.go @@ -18,10 +18,16 @@ type transporter struct { timeout time.Duration } +// response struct +type transporterResponse struct { + resp *http.Response + err error +} + // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) transporter { +func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter { t := transporter{} tr := &http.Transport{ @@ -36,7 +42,7 @@ func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) tr t.client = &http.Client{Transport: tr} t.timeout = timeout - return t + return &t } // Dial with timeout @@ -45,7 +51,7 @@ func dialTimeout(network, addr string) (net.Conn, error) { } // Sends AppendEntries RPCs to a peer when the server is the leader. -func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { +func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -72,7 +78,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe } // Sends RequestVote RPCs to a peer when the server is the candidate. -func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { +func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { var rvrsp *raft.RequestVoteResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -98,7 +104,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req * } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. -func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { +func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { var aersp *raft.SnapshotResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -126,7 +132,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. -func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { +func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { var aersp *raft.SnapshotRecoveryResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) @@ -153,41 +159,35 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft } // Send server side POST request -func (t transporter) Post(path string, body io.Reader) (*http.Response, error) { - - postChan := make(chan interface{}, 1) +func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) { + c := make(chan *transporterResponse, 1) + go func() { - resp, err := t.client.Post(path, "application/json", body) - if err == nil { - postChan <- resp - } else { - postChan <- err - } + tr := new(transporterResponse) + tr.resp, tr.err = t.client.Post(path, "application/json", body) + c <-tr }() - return t.waitResponse(postChan) + return t.waitResponse(c) } // Send server side GET request -func (t transporter) Get(path string) (*http.Response, error) { - - getChan := make(chan interface{}, 1) +func (t *transporter) Get(path string) (*http.Response, error) { + c := make(chan *transporterResponse, 1) + go func() { - resp, err := t.client.Get(path) - if err == nil { - getChan <- resp - } else { - getChan <- err - } + tr := new(transporterResponse) + tr.resp, tr.err = t.client.Get(path) + c <-tr }() - return t.waitResponse(getChan) + return t.waitResponse(c) } -func (t transporter) waitResponse(responseChan chan interface{}) (*http.Response, error) { +func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) { timeoutChan := time.After(t.timeout) @@ -195,17 +195,10 @@ func (t transporter) waitResponse(responseChan chan interface{}) (*http.Response case <-timeoutChan: return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout) - case r := <-responseChan: - switch r := r.(type) { - case error: - return nil, r - - case *http.Response: - return r, nil - - } + case r:= <-responseChan: + return r.resp, r.err } // for complier return nil, nil -} +} \ No newline at end of file diff --git a/transporter_test.go b/transporter_test.go index 7dec4ea2e..ac1ecc975 100644 --- a/transporter_test.go +++ b/transporter_test.go @@ -12,6 +12,7 @@ func TestTransporterTimeout(t *testing.T) { ts := newTransporter("http", conf, time.Second) + ts.Get("http://google.com") _, err := ts.Get("http://google.com:9999") // it doesn't exisit if err == nil || err.Error() != "Wait Response Timeout: 1s" { t.Fatal("timeout error: ", err.Error( ))