mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
440 lines
11 KiB
Go
440 lines
11 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rafthttp
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
"github.com/coreos/etcd/pkg/testutil"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/version"
|
|
"github.com/coreos/go-semver/semver"
|
|
)
|
|
|
|
// TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
|
|
// to streamWriter. After that, streamWriter can use it to send messages
|
|
// continuously, and closes it when stopped.
|
|
func TestStreamWriterAttachOutgoingConn(t *testing.T) {
|
|
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
|
|
// the expected initial state of streamWriter is not working
|
|
if _, ok := sw.writec(); ok {
|
|
t.Errorf("initial working status = %v, want false", ok)
|
|
}
|
|
|
|
// repeat tests to ensure streamWriter can use last attached connection
|
|
var wfc *fakeWriteFlushCloser
|
|
for i := 0; i < 3; i++ {
|
|
prevwfc := wfc
|
|
wfc = newFakeWriteFlushCloser(nil)
|
|
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
|
|
|
|
// previous attached connection should be closed
|
|
if prevwfc != nil {
|
|
select {
|
|
case <-prevwfc.closed:
|
|
case <-time.After(time.Second):
|
|
t.Errorf("#%d: close of previous connection timed out", i)
|
|
}
|
|
}
|
|
|
|
// if prevwfc != nil, the new msgc is ready since prevwfc has closed
|
|
// if prevwfc == nil, the first connection may be pending, but the first
|
|
// msgc is already available since it's set on calling startStreamwriter
|
|
msgc, _ := sw.writec()
|
|
msgc <- raftpb.Message{}
|
|
|
|
select {
|
|
case <-wfc.writec:
|
|
case <-time.After(time.Second):
|
|
t.Errorf("#%d: failed to write to the underlying connection", i)
|
|
}
|
|
// write chan is still available
|
|
if _, ok := sw.writec(); !ok {
|
|
t.Errorf("#%d: working status = %v, want true", i, ok)
|
|
}
|
|
}
|
|
|
|
sw.stop()
|
|
// write chan is unavailable since the writer is stopped.
|
|
if _, ok := sw.writec(); ok {
|
|
t.Errorf("working status after stop = %v, want false", ok)
|
|
}
|
|
if !wfc.Closed() {
|
|
t.Errorf("failed to close the underlying connection")
|
|
}
|
|
}
|
|
|
|
// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
|
|
// outgoingConn will close the outgoingConn and fall back to non-working status.
|
|
func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
|
|
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
|
|
defer sw.stop()
|
|
wfc := newFakeWriteFlushCloser(errors.New("blah"))
|
|
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
|
|
|
|
sw.msgc <- raftpb.Message{}
|
|
select {
|
|
case <-wfc.closed:
|
|
case <-time.After(time.Second):
|
|
t.Errorf("failed to close the underlying connection in time")
|
|
}
|
|
// no longer working
|
|
if _, ok := sw.writec(); ok {
|
|
t.Errorf("working = %v, want false", ok)
|
|
}
|
|
}
|
|
|
|
func TestStreamReaderDialRequest(t *testing.T) {
|
|
for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
|
|
tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
|
|
sr := &streamReader{
|
|
peerID: types.ID(2),
|
|
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
|
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
|
}
|
|
sr.dial(tt)
|
|
|
|
act, err := tr.rec.Wait(1)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
req := act[0].Params[0].(*http.Request)
|
|
|
|
wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1")
|
|
if req.URL.String() != wurl {
|
|
t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
|
|
}
|
|
if w := "GET"; req.Method != w {
|
|
t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
|
|
}
|
|
if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
|
|
t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
|
|
}
|
|
if g := req.Header.Get("X-Raft-To"); g != "2" {
|
|
t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestStreamReaderDialResult tests the result of the dial func call meets the
|
|
// HTTP response received.
|
|
func TestStreamReaderDialResult(t *testing.T) {
|
|
tests := []struct {
|
|
code int
|
|
err error
|
|
wok bool
|
|
whalt bool
|
|
}{
|
|
{0, errors.New("blah"), false, false},
|
|
{http.StatusOK, nil, true, false},
|
|
{http.StatusMethodNotAllowed, nil, false, false},
|
|
{http.StatusNotFound, nil, false, false},
|
|
{http.StatusPreconditionFailed, nil, false, false},
|
|
{http.StatusGone, nil, false, true},
|
|
}
|
|
for i, tt := range tests {
|
|
h := http.Header{}
|
|
h.Add("X-Server-Version", version.Version)
|
|
tr := &respRoundTripper{
|
|
code: tt.code,
|
|
header: h,
|
|
err: tt.err,
|
|
}
|
|
sr := &streamReader{
|
|
peerID: types.ID(2),
|
|
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
|
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
|
errorc: make(chan error, 1),
|
|
}
|
|
|
|
_, err := sr.dial(streamTypeMessage)
|
|
if ok := err == nil; ok != tt.wok {
|
|
t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
|
|
}
|
|
if halt := len(sr.errorc) > 0; halt != tt.whalt {
|
|
t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
|
|
func TestStreamReaderStopOnDial(t *testing.T) {
|
|
defer testutil.AfterTest(t)
|
|
h := http.Header{}
|
|
h.Add("X-Server-Version", version.Version)
|
|
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
|
|
sr := &streamReader{
|
|
peerID: types.ID(2),
|
|
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
|
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
|
errorc: make(chan error, 1),
|
|
typ: streamTypeMessage,
|
|
status: newPeerStatus(types.ID(2)),
|
|
}
|
|
tr.onResp = func() {
|
|
// stop() waits for the run() goroutine to exit, but that exit
|
|
// needs a response from RoundTrip() first; use goroutine
|
|
go sr.stop()
|
|
// wait so that stop() is blocked on run() exiting
|
|
time.Sleep(10 * time.Millisecond)
|
|
// sr.run() completes dialing then begins decoding while stopped
|
|
}
|
|
sr.start()
|
|
select {
|
|
case <-sr.done:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("streamReader did not stop in time")
|
|
}
|
|
}
|
|
|
|
type respWaitRoundTripper struct {
|
|
rrt *respRoundTripper
|
|
onResp func()
|
|
}
|
|
|
|
func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
resp, err := t.rrt.RoundTrip(req)
|
|
resp.Body = newWaitReadCloser()
|
|
t.onResp()
|
|
return resp, err
|
|
}
|
|
|
|
type waitReadCloser struct{ closec chan struct{} }
|
|
|
|
func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
|
|
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
|
|
<-wrc.closec
|
|
return 0, io.EOF
|
|
}
|
|
func (wrc *waitReadCloser) Close() error {
|
|
close(wrc.closec)
|
|
return nil
|
|
}
|
|
|
|
// TestStreamReaderDialDetectUnsupport tests that dial func could find
|
|
// out that the stream type is not supported by the remote.
|
|
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
|
|
for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
|
|
// the response from etcd 2.0
|
|
tr := &respRoundTripper{
|
|
code: http.StatusNotFound,
|
|
header: http.Header{},
|
|
}
|
|
sr := &streamReader{
|
|
peerID: types.ID(2),
|
|
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
|
|
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
|
|
}
|
|
|
|
_, err := sr.dial(typ)
|
|
if err != errUnsupportedStreamType {
|
|
t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestStream tests that streamReader and streamWriter can build stream to
|
|
// send messages between each other.
|
|
func TestStream(t *testing.T) {
|
|
recvc := make(chan raftpb.Message, streamBufSize)
|
|
propc := make(chan raftpb.Message, streamBufSize)
|
|
msgapp := raftpb.Message{
|
|
Type: raftpb.MsgApp,
|
|
From: 2,
|
|
To: 1,
|
|
Term: 1,
|
|
LogTerm: 1,
|
|
Index: 3,
|
|
Entries: []raftpb.Entry{{Term: 1, Index: 4}},
|
|
}
|
|
|
|
tests := []struct {
|
|
t streamType
|
|
m raftpb.Message
|
|
wc chan raftpb.Message
|
|
}{
|
|
{
|
|
streamTypeMessage,
|
|
raftpb.Message{Type: raftpb.MsgProp, To: 2},
|
|
propc,
|
|
},
|
|
{
|
|
streamTypeMessage,
|
|
msgapp,
|
|
recvc,
|
|
},
|
|
{
|
|
streamTypeMsgAppV2,
|
|
msgapp,
|
|
recvc,
|
|
},
|
|
}
|
|
for i, tt := range tests {
|
|
h := &fakeStreamHandler{t: tt.t}
|
|
srv := httptest.NewServer(h)
|
|
defer srv.Close()
|
|
|
|
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
|
|
defer sw.stop()
|
|
h.sw = sw
|
|
|
|
picker := mustNewURLPicker(t, []string{srv.URL})
|
|
tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
|
|
|
|
sr := &streamReader{
|
|
peerID: types.ID(2),
|
|
typ: tt.t,
|
|
tr: tr,
|
|
picker: picker,
|
|
status: newPeerStatus(types.ID(2)),
|
|
recvc: recvc,
|
|
propc: propc,
|
|
}
|
|
sr.start()
|
|
|
|
// wait for stream to work
|
|
var writec chan<- raftpb.Message
|
|
for {
|
|
var ok bool
|
|
if writec, ok = sw.writec(); ok {
|
|
break
|
|
}
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
|
|
writec <- tt.m
|
|
var m raftpb.Message
|
|
select {
|
|
case m = <-tt.wc:
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("#%d: failed to receive message from the channel", i)
|
|
}
|
|
if !reflect.DeepEqual(m, tt.m) {
|
|
t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
|
|
}
|
|
|
|
sr.stop()
|
|
}
|
|
}
|
|
|
|
func TestCheckStreamSupport(t *testing.T) {
|
|
tests := []struct {
|
|
v *semver.Version
|
|
t streamType
|
|
w bool
|
|
}{
|
|
// support
|
|
{
|
|
semver.Must(semver.NewVersion("2.1.0")),
|
|
streamTypeMsgAppV2,
|
|
true,
|
|
},
|
|
// ignore patch
|
|
{
|
|
semver.Must(semver.NewVersion("2.1.9")),
|
|
streamTypeMsgAppV2,
|
|
true,
|
|
},
|
|
// ignore prerelease
|
|
{
|
|
semver.Must(semver.NewVersion("2.1.0-alpha")),
|
|
streamTypeMsgAppV2,
|
|
true,
|
|
},
|
|
}
|
|
for i, tt := range tests {
|
|
if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
|
|
t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
|
|
}
|
|
}
|
|
}
|
|
|
|
type fakeWriteFlushCloser struct {
|
|
mu sync.Mutex
|
|
err error
|
|
written int
|
|
closed chan struct{}
|
|
writec chan struct{}
|
|
}
|
|
|
|
func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
|
|
return &fakeWriteFlushCloser{
|
|
err: err,
|
|
closed: make(chan struct{}),
|
|
writec: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
|
|
wfc.mu.Lock()
|
|
defer wfc.mu.Unlock()
|
|
select {
|
|
case wfc.writec <- struct{}{}:
|
|
default:
|
|
}
|
|
wfc.written += len(p)
|
|
return len(p), wfc.err
|
|
}
|
|
|
|
func (wfc *fakeWriteFlushCloser) Flush() {}
|
|
|
|
func (wfc *fakeWriteFlushCloser) Close() error {
|
|
close(wfc.closed)
|
|
return wfc.err
|
|
}
|
|
|
|
func (wfc *fakeWriteFlushCloser) Written() int {
|
|
wfc.mu.Lock()
|
|
defer wfc.mu.Unlock()
|
|
return wfc.written
|
|
}
|
|
|
|
func (wfc *fakeWriteFlushCloser) Closed() bool {
|
|
select {
|
|
case <-wfc.closed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
type fakeStreamHandler struct {
|
|
t streamType
|
|
sw *streamWriter
|
|
}
|
|
|
|
func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Add("X-Server-Version", version.Version)
|
|
w.(http.Flusher).Flush()
|
|
c := newCloseNotifier()
|
|
h.sw.attach(&outgoingConn{
|
|
t: h.t,
|
|
Writer: w,
|
|
Flusher: w.(http.Flusher),
|
|
Closer: c,
|
|
})
|
|
<-c.closeNotify()
|
|
}
|