Merge pull request #12830 from ptabor/20210405-split-pkg

Split client/pkg as dedicated low-dependencies module for client
This commit is contained in:
Piotr Tabor
2021-04-08 00:48:41 +02:00
277 changed files with 794 additions and 320 deletions

View File

@@ -27,15 +27,18 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/fileutil"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/idutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/wait"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
@@ -49,6 +52,7 @@ import (
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
@@ -1540,6 +1544,127 @@ func TestPublishRetry(t *testing.T) {
<-ch
}
func TestPublishV3(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- &applyResult{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
lg := zaptest.NewLogger(t)
be, _ := backend.NewDefaultTmpBackend(t)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
}
srv.publishV3(time.Hour)
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.InternalRaftRequest
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
assert.Equal(t, &membershippb.ClusterMemberAttrSetRequest{Member_ID: 0x1, MemberAttributes: &membershippb.Attributes{
Name: "node1", ClientUrls: []string{"http://a", "http://b"}}}, r.ClusterMemberAttrSet)
}
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishV3Stopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
done: make(chan struct{}),
stopping: make(chan struct{}),
stop: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
close(srv.stopping)
srv.publishV3(time.Hour)
}
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishV3Retry(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
n := newNodeRecorderStream()
lg := zaptest.NewLogger(t)
be, _ := backend.NewDefaultTmpBackend(t)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
}
// expect multiple proposals from retrying
ch := make(chan struct{})
go func() {
defer close(ch)
if action, err := n.Wait(2); err != nil {
t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
}
close(srv.stopping)
// drain remaining actions, if any, so publish can terminate
for {
select {
case <-ch:
return
default:
n.Action()
}
}
}()
srv.publishV3(10 * time.Nanosecond)
ch <- struct{}{}
<-ch
}
func TestStopNotify(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),