Merge pull request #12804 from ptabor/20210326-v3-publish

server: v2store deprecation: Prepare to use publishV3 instead of publish V2.
This commit is contained in:
Piotr Tabor 2021-04-08 09:22:22 +02:00 committed by GitHub
commit 63c25bf378
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 164 additions and 19 deletions

View File

@ -157,7 +157,7 @@ test-smoke:
test-full:
$(info log-file: test-$(TEST_SUFFIX).log)
PASSES="fmt build unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
PASSES="fmt build release unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
docker-test:
$(info GO_VERSION: $(GO_VERSION))

View File

@ -171,6 +171,12 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
continue
}
if raftReq.ClusterMemberAttrSet != nil {
log.Println("ignoring cluster_member_attr_set")
remove()
continue
}
if v3 || raftReq.Header == nil {
continue
}

View File

@ -221,17 +221,17 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
case r.AuthRoleList != nil:
op = "AuthRoleList"
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
case r.ClusterVersionSet != nil:
case r.ClusterVersionSet != nil: // Implemented in 3.5.x
op = "ClusterVersionSet"
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet"
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet"
op = "DowngradeInfoSet" // Implemented in 3.5.x
a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
default:
panic("not implemented")
a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
}
return ar
}

View File

@ -721,6 +721,8 @@ func (s *EtcdServer) adjustTicks() {
func (s *EtcdServer) Start() {
s.start()
s.GoAttach(func() { s.adjustTicks() })
// TODO: Switch to publishV3 in 3.6.
// Support for cluster_member_set_attr was added in 3.5.
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
@ -1855,7 +1857,6 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// with the static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
// TODO: replace publish() in 3.6
func (s *EtcdServer) publishV3(timeout time.Duration) {
req := &membershippb.ClusterMemberAttrSetRequest{
Member_ID: uint64(s.id),
@ -1916,7 +1917,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
// but does not go through v2 API endpoint, which means even with v2
// client handler disabled (e.g. --enable-v2=false), cluster can still
// process publish requests through rafthttp
// TODO: Deprecate v2 store in 3.6
// TODO: Remove in 3.6 (start using publishV3)
func (s *EtcdServer) publish(timeout time.Duration) {
lg := s.Logger()
b, err := json.Marshal(s.attributes)

View File

@ -27,7 +27,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/client/pkg/v3/types"
@ -36,6 +38,7 @@ import (
"go.etcd.io/etcd/pkg/v3/wait"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
@ -49,6 +52,7 @@ import (
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
@ -98,7 +102,7 @@ func TestDoLocalAction(t *testing.T) {
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
resp, err := srv.Do(context.TODO(), tt.req)
resp, err := srv.Do(context.Background(), tt.req)
if err != tt.werr {
t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
@ -832,7 +836,7 @@ func TestDoProposalStopped(t *testing.T) {
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
func TestSync(t *testing.T) {
n := newNodeRecorder()
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
@ -877,7 +881,7 @@ func TestSync(t *testing.T) {
// after timeout
func TestSyncTimeout(t *testing.T) {
n := newProposalBlockerRecorder()
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
@ -1313,7 +1317,7 @@ func TestAddMember(t *testing.T) {
}
s.start()
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
_, err := s.AddMember(context.TODO(), m)
_, err := s.AddMember(context.Background(), m)
gaction := n.Action()
s.Stop()
@ -1357,7 +1361,7 @@ func TestRemoveMember(t *testing.T) {
consistIndex: cindex.NewFakeConsistentIndex(0),
}
s.start()
_, err := s.RemoveMember(context.TODO(), 1234)
_, err := s.RemoveMember(context.Background(), 1234)
gaction := n.Action()
s.Stop()
@ -1402,7 +1406,7 @@ func TestUpdateMember(t *testing.T) {
}
s.start()
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
_, err := s.UpdateMember(context.TODO(), wm)
_, err := s.UpdateMember(context.Background(), wm)
gaction := n.Action()
s.Stop()
@ -1426,7 +1430,7 @@ func TestPublish(t *testing.T) {
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(context.Background())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
@ -1475,7 +1479,7 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(context.Background())
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
@ -1503,7 +1507,7 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(context.Background())
n := newNodeRecorderStream()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
@ -1540,6 +1544,127 @@ func TestPublishRetry(t *testing.T) {
<-ch
}
func TestPublishV3(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- &applyResult{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.Background())
lg := zaptest.NewLogger(t)
be, _ := backend.NewDefaultTmpBackend(t)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
}
srv.publishV3(time.Hour)
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.InternalRaftRequest
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
assert.Equal(t, &membershippb.ClusterMemberAttrSetRequest{Member_ID: 0x1, MemberAttributes: &membershippb.Attributes{
Name: "node1", ClientUrls: []string{"http://a", "http://b"}}}, r.ClusterMemberAttrSet)
}
// TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishV3Stopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: newNodeNop(),
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
done: make(chan struct{}),
stopping: make(chan struct{}),
stop: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
ctx: ctx,
cancel: cancel,
}
close(srv.stopping)
srv.publishV3(time.Hour)
}
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishV3Retry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n := newNodeRecorderStream()
lg := zaptest.NewLogger(t)
be, _ := backend.NewDefaultTmpBackend(t)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
}
// expect multiple proposals from retrying
ch := make(chan struct{})
go func() {
defer close(ch)
if action, err := n.Wait(2); err != nil {
t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
}
close(srv.stopping)
// drain remaining actions, if any, so publish can terminate
for {
select {
case <-ch:
return
default:
n.Action()
}
}
}()
srv.publishV3(10 * time.Nanosecond)
ch <- struct{}{}
<-ch
}
func TestStopNotify(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),

View File

@ -23,6 +23,7 @@ require (
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.5.1
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.5

View File

@ -613,7 +613,7 @@ function dep_pass {
function release_pass {
rm -f ./bin/etcd-last-release
# to grab latest patch release; bump this up for every minor release
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.3.*" | head -1)
UPGRADE_VER=$(git tag -l --sort=-version:refname "v3.4.*" | head -1)
if [ -n "$MANUAL_VER" ]; then
# in case, we need to test against different version
UPGRADE_VER=$MANUAL_VER

View File

@ -6,7 +6,9 @@ package e2e
import (
"flag"
"log"
"os"
"path/filepath"
"runtime"
"testing"
@ -36,8 +38,18 @@ func TestMain(m *testing.M) {
os.Setenv("ETCD_UNSUPPORTED_ARCH", runtime.GOARCH)
os.Unsetenv("ETCDCTL_API")
flag.StringVar(&binDir, "bin-dir", "../../bin", "The directory for store etcd and etcdctl binaries.")
flag.StringVar(&certDir, "cert-dir", "../fixtures", "The directory for store certificate files.")
binDirDef, err := filepath.Abs("../../bin")
if err != nil {
log.Fatal(err)
}
certDirDef, err := filepath.Abs("../fixtures")
if err != nil {
log.Fatal(err)
}
flag.StringVar(&binDir, "bin-dir", binDirDef, "The directory for store etcd and etcdctl binaries.")
flag.StringVar(&certDir, "cert-dir", certDirDef, "The directory for store certificate files.")
flag.Parse()
binPath = binDir + "/etcd"