mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: remove v2 version set; e2e: fix tests.
This commit is contained in:
parent
ed5a01a48d
commit
7784ca8997
@ -247,7 +247,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
||||
defer c.Unlock()
|
||||
|
||||
c.members, c.removed = membersFromStore(c.lg, c.v2store)
|
||||
c.version = clusterVersionFromBackend(c.lg, c.be)
|
||||
if c.be != nil {
|
||||
c.version = clusterVersionFromBackend(c.lg, c.be)
|
||||
} else {
|
||||
c.version = clusterVersionFromStore(c.lg, c.v2store)
|
||||
}
|
||||
|
||||
mustDetectDowngrade(c.lg, c.version)
|
||||
onSet(c.lg, c.version)
|
||||
|
||||
@ -766,7 +771,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
|
||||
if lg != nil {
|
||||
lg.Panic(
|
||||
"unexpected number of keys when getting cluster version from backend",
|
||||
zap.Int("number fo keys", len(keys)),
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -19,12 +19,10 @@ import (
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/etcdserver/api"
|
||||
"go.etcd.io/etcd/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/pkg/pbutil"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -91,10 +89,8 @@ func (a *applierV2store) Put(r *RequestV2) Response {
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
}
|
||||
// remove v2 version set to avoid the conflict between v2 and v3.
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if a.cluster != nil {
|
||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
|
||||
}
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
}
|
||||
|
@ -732,7 +732,7 @@ func (s *EtcdServer) adjustTicks() {
|
||||
func (s *EtcdServer) Start() {
|
||||
s.start()
|
||||
s.goAttach(func() { s.adjustTicks() })
|
||||
s.goAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
|
||||
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
||||
s.goAttach(s.purgeFile)
|
||||
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
|
||||
s.goAttach(s.monitorVersions)
|
||||
@ -1997,6 +1997,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
|
||||
// with the static clientURLs of the server.
|
||||
// The function keeps attempting to register until it succeeds,
|
||||
// or its server is stopped.
|
||||
// TODO: replace publish() in 3.6
|
||||
func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
req := &membershippb.ClusterMemberAttrSetRequest{
|
||||
Member_ID: uint64(s.id),
|
||||
@ -2005,18 +2006,16 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
ClientUrls: s.attributes.ClientURLs,
|
||||
},
|
||||
}
|
||||
|
||||
lg := s.getLogger()
|
||||
for {
|
||||
select {
|
||||
case <-s.stopping:
|
||||
if lg := s.getLogger(); lg != nil {
|
||||
lg.Warn(
|
||||
"stopped publish because server is stopping",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
)
|
||||
}
|
||||
lg.Warn(
|
||||
"stopped publish because server is stopping",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
)
|
||||
return
|
||||
|
||||
default:
|
||||
@ -2028,27 +2027,23 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
switch err {
|
||||
case nil:
|
||||
close(s.readych)
|
||||
if lg := s.getLogger(); lg != nil {
|
||||
lg.Info(
|
||||
"published local member to cluster through raft",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.String("cluster-id", s.cluster.ID().String()),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
)
|
||||
}
|
||||
lg.Info(
|
||||
"published local member to cluster through raft",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.String("cluster-id", s.cluster.ID().String()),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
)
|
||||
return
|
||||
|
||||
default:
|
||||
if lg := s.getLogger(); lg != nil {
|
||||
lg.Warn(
|
||||
"failed to publish local member to cluster through raft",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
lg.Warn(
|
||||
"failed to publish local member to cluster through raft",
|
||||
zap.String("local-member-id", s.ID().String()),
|
||||
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
|
||||
zap.Duration("publish-timeout", timeout),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2063,7 +2058,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
|
||||
// but does not go through v2 API endpoint, which means even with v2
|
||||
// client handler disabled (e.g. --enable-v2=false), cluster can still
|
||||
// process publish requests through rafthttp
|
||||
// TODO: Deprecate v2 store
|
||||
// TODO: Deprecate v2 store in 3.6
|
||||
func (s *EtcdServer) publish(timeout time.Duration) {
|
||||
b, err := json.Marshal(s.attributes)
|
||||
if err != nil {
|
||||
|
@ -28,7 +28,9 @@ import (
|
||||
func TestCtlV3Migrate(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
epc := setupEtcdctlTest(t, &configNoTLS, false)
|
||||
cfg := configNoTLS
|
||||
cfg.enableV2 = true
|
||||
epc := setupEtcdctlTest(t, &cfg, false)
|
||||
defer func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
@ -69,10 +71,6 @@ func TestCtlV3Migrate(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// to ensure revision increment is continuous from migrated v2 data
|
||||
if err := ctlV3Put(cx, "test", "value", ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: epc.EndpointsV3(),
|
||||
DialTimeout: 3 * time.Second,
|
||||
@ -85,11 +83,22 @@ func TestCtlV3Migrate(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
revAfterMigrate := resp.Header.Revision
|
||||
// to ensure revision increment is continuous from migrated v2 data
|
||||
if err := ctlV3Put(cx, "test", "value", ""); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
resp, err = cli.Get(context.TODO(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Kvs) != 1 {
|
||||
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
|
||||
}
|
||||
if resp.Kvs[0].CreateRevision != 7 {
|
||||
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
|
||||
|
||||
if resp.Kvs[0].CreateRevision != revAfterMigrate+1 {
|
||||
t.Fatalf("expected revision increment is continuous from migrated v2, got %d", resp.Kvs[0].CreateRevision)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user