mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			440 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			440 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2015 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package rafthttp
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"net/http/httptest"
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/coreos/etcd/etcdserver/stats"
 | |
| 	"github.com/coreos/etcd/pkg/testutil"
 | |
| 	"github.com/coreos/etcd/pkg/types"
 | |
| 	"github.com/coreos/etcd/raft/raftpb"
 | |
| 	"github.com/coreos/etcd/version"
 | |
| 	"github.com/coreos/go-semver/semver"
 | |
| )
 | |
| 
 | |
| // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
 | |
| // to streamWriter. After that, streamWriter can use it to send messages
 | |
| // continuously, and closes it when stopped.
 | |
| func TestStreamWriterAttachOutgoingConn(t *testing.T) {
 | |
| 	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 | |
| 	// the expected initial state of streamWriter is not working
 | |
| 	if _, ok := sw.writec(); ok {
 | |
| 		t.Errorf("initial working status = %v, want false", ok)
 | |
| 	}
 | |
| 
 | |
| 	// repeat tests to ensure streamWriter can use last attached connection
 | |
| 	var wfc *fakeWriteFlushCloser
 | |
| 	for i := 0; i < 3; i++ {
 | |
| 		prevwfc := wfc
 | |
| 		wfc = newFakeWriteFlushCloser(nil)
 | |
| 		sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
 | |
| 
 | |
| 		// previous attached connection should be closed
 | |
| 		if prevwfc != nil {
 | |
| 			select {
 | |
| 			case <-prevwfc.closed:
 | |
| 			case <-time.After(time.Second):
 | |
| 				t.Errorf("#%d: close of previous connection timed out", i)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// if prevwfc != nil, the new msgc is ready since prevwfc has closed
 | |
| 		// if prevwfc == nil, the first connection may be pending, but the first
 | |
| 		// msgc is already available since it's set on calling startStreamwriter
 | |
| 		msgc, _ := sw.writec()
 | |
| 		msgc <- raftpb.Message{}
 | |
| 
 | |
| 		select {
 | |
| 		case <-wfc.writec:
 | |
| 		case <-time.After(time.Second):
 | |
| 			t.Errorf("#%d: failed to write to the underlying connection", i)
 | |
| 		}
 | |
| 		// write chan is still available
 | |
| 		if _, ok := sw.writec(); !ok {
 | |
| 			t.Errorf("#%d: working status = %v, want true", i, ok)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	sw.stop()
 | |
| 	// write chan is unavailable since the writer is stopped.
 | |
| 	if _, ok := sw.writec(); ok {
 | |
| 		t.Errorf("working status after stop = %v, want false", ok)
 | |
| 	}
 | |
| 	if !wfc.Closed() {
 | |
| 		t.Errorf("failed to close the underlying connection")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
 | |
| // outgoingConn will close the outgoingConn and fall back to non-working status.
 | |
| func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
 | |
| 	sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 | |
| 	defer sw.stop()
 | |
| 	wfc := newFakeWriteFlushCloser(errors.New("blah"))
 | |
| 	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
 | |
| 
 | |
| 	sw.msgc <- raftpb.Message{}
 | |
| 	select {
 | |
| 	case <-wfc.closed:
 | |
| 	case <-time.After(time.Second):
 | |
| 		t.Errorf("failed to close the underlying connection in time")
 | |
| 	}
 | |
| 	// no longer working
 | |
| 	if _, ok := sw.writec(); ok {
 | |
| 		t.Errorf("working = %v, want false", ok)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestStreamReaderDialRequest(t *testing.T) {
 | |
| 	for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
 | |
| 		tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
 | |
| 		sr := &streamReader{
 | |
| 			peerID: types.ID(2),
 | |
| 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
 | |
| 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 | |
| 		}
 | |
| 		sr.dial(tt)
 | |
| 
 | |
| 		act, err := tr.rec.Wait(1)
 | |
| 		if err != nil {
 | |
| 			t.Fatal(err)
 | |
| 		}
 | |
| 		req := act[0].Params[0].(*http.Request)
 | |
| 
 | |
| 		wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1")
 | |
| 		if req.URL.String() != wurl {
 | |
| 			t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
 | |
| 		}
 | |
| 		if w := "GET"; req.Method != w {
 | |
| 			t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
 | |
| 		}
 | |
| 		if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
 | |
| 			t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
 | |
| 		}
 | |
| 		if g := req.Header.Get("X-Raft-To"); g != "2" {
 | |
| 			t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestStreamReaderDialResult tests the result of the dial func call meets the
 | |
| // HTTP response received.
 | |
| func TestStreamReaderDialResult(t *testing.T) {
 | |
| 	tests := []struct {
 | |
| 		code  int
 | |
| 		err   error
 | |
| 		wok   bool
 | |
| 		whalt bool
 | |
| 	}{
 | |
| 		{0, errors.New("blah"), false, false},
 | |
| 		{http.StatusOK, nil, true, false},
 | |
| 		{http.StatusMethodNotAllowed, nil, false, false},
 | |
| 		{http.StatusNotFound, nil, false, false},
 | |
| 		{http.StatusPreconditionFailed, nil, false, false},
 | |
| 		{http.StatusGone, nil, false, true},
 | |
| 	}
 | |
| 	for i, tt := range tests {
 | |
| 		h := http.Header{}
 | |
| 		h.Add("X-Server-Version", version.Version)
 | |
| 		tr := &respRoundTripper{
 | |
| 			code:   tt.code,
 | |
| 			header: h,
 | |
| 			err:    tt.err,
 | |
| 		}
 | |
| 		sr := &streamReader{
 | |
| 			peerID: types.ID(2),
 | |
| 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 | |
| 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 | |
| 			errorc: make(chan error, 1),
 | |
| 		}
 | |
| 
 | |
| 		_, err := sr.dial(streamTypeMessage)
 | |
| 		if ok := err == nil; ok != tt.wok {
 | |
| 			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
 | |
| 		}
 | |
| 		if halt := len(sr.errorc) > 0; halt != tt.whalt {
 | |
| 			t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
 | |
| func TestStreamReaderStopOnDial(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 	h := http.Header{}
 | |
| 	h.Add("X-Server-Version", version.Version)
 | |
| 	tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
 | |
| 	sr := &streamReader{
 | |
| 		peerID: types.ID(2),
 | |
| 		tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 | |
| 		picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 | |
| 		errorc: make(chan error, 1),
 | |
| 		typ:    streamTypeMessage,
 | |
| 		status: newPeerStatus(types.ID(2)),
 | |
| 	}
 | |
| 	tr.onResp = func() {
 | |
| 		// stop() waits for the run() goroutine to exit, but that exit
 | |
| 		// needs a response from RoundTrip() first; use goroutine
 | |
| 		go sr.stop()
 | |
| 		// wait so that stop() is blocked on run() exiting
 | |
| 		time.Sleep(10 * time.Millisecond)
 | |
| 		// sr.run() completes dialing then begins decoding while stopped
 | |
| 	}
 | |
| 	sr.start()
 | |
| 	select {
 | |
| 	case <-sr.done:
 | |
| 	case <-time.After(time.Second):
 | |
| 		t.Fatal("streamReader did not stop in time")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type respWaitRoundTripper struct {
 | |
| 	rrt    *respRoundTripper
 | |
| 	onResp func()
 | |
| }
 | |
| 
 | |
| func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
 | |
| 	resp, err := t.rrt.RoundTrip(req)
 | |
| 	resp.Body = newWaitReadCloser()
 | |
| 	t.onResp()
 | |
| 	return resp, err
 | |
| }
 | |
| 
 | |
| type waitReadCloser struct{ closec chan struct{} }
 | |
| 
 | |
| func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
 | |
| func (wrc *waitReadCloser) Read(p []byte) (int, error) {
 | |
| 	<-wrc.closec
 | |
| 	return 0, io.EOF
 | |
| }
 | |
| func (wrc *waitReadCloser) Close() error {
 | |
| 	close(wrc.closec)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // TestStreamReaderDialDetectUnsupport tests that dial func could find
 | |
| // out that the stream type is not supported by the remote.
 | |
| func TestStreamReaderDialDetectUnsupport(t *testing.T) {
 | |
| 	for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
 | |
| 		// the response from etcd 2.0
 | |
| 		tr := &respRoundTripper{
 | |
| 			code:   http.StatusNotFound,
 | |
| 			header: http.Header{},
 | |
| 		}
 | |
| 		sr := &streamReader{
 | |
| 			peerID: types.ID(2),
 | |
| 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 | |
| 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
 | |
| 		}
 | |
| 
 | |
| 		_, err := sr.dial(typ)
 | |
| 		if err != errUnsupportedStreamType {
 | |
| 			t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestStream tests that streamReader and streamWriter can build stream to
 | |
| // send messages between each other.
 | |
| func TestStream(t *testing.T) {
 | |
| 	recvc := make(chan raftpb.Message, streamBufSize)
 | |
| 	propc := make(chan raftpb.Message, streamBufSize)
 | |
| 	msgapp := raftpb.Message{
 | |
| 		Type:    raftpb.MsgApp,
 | |
| 		From:    2,
 | |
| 		To:      1,
 | |
| 		Term:    1,
 | |
| 		LogTerm: 1,
 | |
| 		Index:   3,
 | |
| 		Entries: []raftpb.Entry{{Term: 1, Index: 4}},
 | |
| 	}
 | |
| 
 | |
| 	tests := []struct {
 | |
| 		t  streamType
 | |
| 		m  raftpb.Message
 | |
| 		wc chan raftpb.Message
 | |
| 	}{
 | |
| 		{
 | |
| 			streamTypeMessage,
 | |
| 			raftpb.Message{Type: raftpb.MsgProp, To: 2},
 | |
| 			propc,
 | |
| 		},
 | |
| 		{
 | |
| 			streamTypeMessage,
 | |
| 			msgapp,
 | |
| 			recvc,
 | |
| 		},
 | |
| 		{
 | |
| 			streamTypeMsgAppV2,
 | |
| 			msgapp,
 | |
| 			recvc,
 | |
| 		},
 | |
| 	}
 | |
| 	for i, tt := range tests {
 | |
| 		h := &fakeStreamHandler{t: tt.t}
 | |
| 		srv := httptest.NewServer(h)
 | |
| 		defer srv.Close()
 | |
| 
 | |
| 		sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
 | |
| 		defer sw.stop()
 | |
| 		h.sw = sw
 | |
| 
 | |
| 		picker := mustNewURLPicker(t, []string{srv.URL})
 | |
| 		tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
 | |
| 
 | |
| 		sr := &streamReader{
 | |
| 			peerID: types.ID(2),
 | |
| 			typ:    tt.t,
 | |
| 			tr:     tr,
 | |
| 			picker: picker,
 | |
| 			status: newPeerStatus(types.ID(2)),
 | |
| 			recvc:  recvc,
 | |
| 			propc:  propc,
 | |
| 		}
 | |
| 		sr.start()
 | |
| 
 | |
| 		// wait for stream to work
 | |
| 		var writec chan<- raftpb.Message
 | |
| 		for {
 | |
| 			var ok bool
 | |
| 			if writec, ok = sw.writec(); ok {
 | |
| 				break
 | |
| 			}
 | |
| 			time.Sleep(time.Millisecond)
 | |
| 		}
 | |
| 
 | |
| 		writec <- tt.m
 | |
| 		var m raftpb.Message
 | |
| 		select {
 | |
| 		case m = <-tt.wc:
 | |
| 		case <-time.After(time.Second):
 | |
| 			t.Fatalf("#%d: failed to receive message from the channel", i)
 | |
| 		}
 | |
| 		if !reflect.DeepEqual(m, tt.m) {
 | |
| 			t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
 | |
| 		}
 | |
| 
 | |
| 		sr.stop()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestCheckStreamSupport(t *testing.T) {
 | |
| 	tests := []struct {
 | |
| 		v *semver.Version
 | |
| 		t streamType
 | |
| 		w bool
 | |
| 	}{
 | |
| 		// support
 | |
| 		{
 | |
| 			semver.Must(semver.NewVersion("2.1.0")),
 | |
| 			streamTypeMsgAppV2,
 | |
| 			true,
 | |
| 		},
 | |
| 		// ignore patch
 | |
| 		{
 | |
| 			semver.Must(semver.NewVersion("2.1.9")),
 | |
| 			streamTypeMsgAppV2,
 | |
| 			true,
 | |
| 		},
 | |
| 		// ignore prerelease
 | |
| 		{
 | |
| 			semver.Must(semver.NewVersion("2.1.0-alpha")),
 | |
| 			streamTypeMsgAppV2,
 | |
| 			true,
 | |
| 		},
 | |
| 	}
 | |
| 	for i, tt := range tests {
 | |
| 		if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
 | |
| 			t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type fakeWriteFlushCloser struct {
 | |
| 	mu      sync.Mutex
 | |
| 	err     error
 | |
| 	written int
 | |
| 	closed  chan struct{}
 | |
| 	writec  chan struct{}
 | |
| }
 | |
| 
 | |
| func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
 | |
| 	return &fakeWriteFlushCloser{
 | |
| 		err:    err,
 | |
| 		closed: make(chan struct{}),
 | |
| 		writec: make(chan struct{}, 1),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
 | |
| 	wfc.mu.Lock()
 | |
| 	defer wfc.mu.Unlock()
 | |
| 	select {
 | |
| 	case wfc.writec <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| 	wfc.written += len(p)
 | |
| 	return len(p), wfc.err
 | |
| }
 | |
| 
 | |
| func (wfc *fakeWriteFlushCloser) Flush() {}
 | |
| 
 | |
| func (wfc *fakeWriteFlushCloser) Close() error {
 | |
| 	close(wfc.closed)
 | |
| 	return wfc.err
 | |
| }
 | |
| 
 | |
| func (wfc *fakeWriteFlushCloser) Written() int {
 | |
| 	wfc.mu.Lock()
 | |
| 	defer wfc.mu.Unlock()
 | |
| 	return wfc.written
 | |
| }
 | |
| 
 | |
| func (wfc *fakeWriteFlushCloser) Closed() bool {
 | |
| 	select {
 | |
| 	case <-wfc.closed:
 | |
| 		return true
 | |
| 	default:
 | |
| 		return false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type fakeStreamHandler struct {
 | |
| 	t  streamType
 | |
| 	sw *streamWriter
 | |
| }
 | |
| 
 | |
| func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| 	w.Header().Add("X-Server-Version", version.Version)
 | |
| 	w.(http.Flusher).Flush()
 | |
| 	c := newCloseNotifier()
 | |
| 	h.sw.attach(&outgoingConn{
 | |
| 		t:       h.t,
 | |
| 		Writer:  w,
 | |
| 		Flusher: w.(http.Flusher),
 | |
| 		Closer:  c,
 | |
| 	})
 | |
| 	<-c.closeNotify()
 | |
| }
 | 
