mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17007 from serathius/remove-v2-applier
Remove v2 applier
This commit is contained in:
commit
5426f6d264
@ -16,154 +16,34 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/errors"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/txn"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const v2Version = "v2"
|
||||
|
||||
// ApplierV2 is the interface for processing V2 raft messages
|
||||
type ApplierV2 interface {
|
||||
Delete(r *RequestV2) Response
|
||||
Post(r *RequestV2) Response
|
||||
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
|
||||
QGet(r *RequestV2) Response
|
||||
Sync(r *RequestV2) Response
|
||||
}
|
||||
|
||||
func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
return &applierV2store{lg: lg, store: s, cluster: c}
|
||||
}
|
||||
|
||||
type applierV2store struct {
|
||||
lg *zap.Logger
|
||||
store v2store.Store
|
||||
cluster *membership.RaftCluster
|
||||
}
|
||||
|
||||
func (a *applierV2store) Delete(r *RequestV2) Response {
|
||||
switch {
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
|
||||
default:
|
||||
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV2store) Post(r *RequestV2) Response {
|
||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
|
||||
}
|
||||
|
||||
func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
|
||||
ttlOptions := r.TTLOptions()
|
||||
exists, existsSet := pbutil.GetBool(r.PrevExist)
|
||||
switch {
|
||||
case existsSet:
|
||||
if exists {
|
||||
if r.PrevIndex == 0 && r.PrevValue == "" {
|
||||
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
|
||||
}
|
||||
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
}
|
||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
default:
|
||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
|
||||
var attr membership.Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||
}
|
||||
if a.cluster != nil {
|
||||
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
|
||||
}
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
}
|
||||
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if a.cluster != nil {
|
||||
// persist to backend given v2store can be very stale
|
||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
|
||||
}
|
||||
return Response{}
|
||||
}
|
||||
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV2store) QGet(r *RequestV2) Response {
|
||||
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
|
||||
}
|
||||
|
||||
func (a *applierV2store) Sync(r *RequestV2) Response {
|
||||
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
|
||||
return Response{}
|
||||
}
|
||||
|
||||
// applyV2Request interprets r as a call to v2store.X
|
||||
// and returns a Response interpreted from v2store.Event
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) {
|
||||
if r.Method != "PUT" || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) {
|
||||
s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
|
||||
}
|
||||
stringer := panicAlternativeStringer{
|
||||
stringer: r,
|
||||
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
|
||||
}
|
||||
defer func(start time.Time) {
|
||||
if !utf8.ValidString(r.Method) {
|
||||
s.lg.Info("method is not valid utf-8")
|
||||
return
|
||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
|
||||
var attr membership.Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||
}
|
||||
if s.cluster != nil {
|
||||
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
|
||||
}
|
||||
}
|
||||
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if s.cluster != nil {
|
||||
// persist to backend given v2store can be very stale
|
||||
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
|
||||
}
|
||||
success := resp.Err == nil
|
||||
txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start))
|
||||
txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
|
||||
}(time.Now())
|
||||
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
return s.applyV2.Post(r)
|
||||
case "PUT":
|
||||
return s.applyV2.Put(r, shouldApplyV3)
|
||||
case "DELETE":
|
||||
return s.applyV2.Delete(r)
|
||||
case "QGET":
|
||||
return s.applyV2.QGet(r)
|
||||
case "SYNC":
|
||||
return s.applyV2.Sync(r)
|
||||
default:
|
||||
// This should never be reached, but just in case:
|
||||
return Response{Err: errors.ErrUnknownMethod}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
|
||||
refresh, _ := pbutil.GetBool(r.Refresh)
|
||||
ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
|
||||
if r.Expiration != 0 {
|
||||
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
|
||||
}
|
||||
return ttlOptions
|
||||
}
|
||||
|
||||
func toResponse(ev *v2store.Event, err error) Response {
|
||||
return Response{Event: ev, Err: err}
|
||||
}
|
||||
|
@ -245,8 +245,6 @@ type EtcdServer struct {
|
||||
v2store v2store.Store
|
||||
snapshotter *snap.Snapshotter
|
||||
|
||||
applyV2 ApplierV2
|
||||
|
||||
uberApply apply.UberApplier
|
||||
|
||||
applyWait wait.WaitTime
|
||||
@ -336,7 +334,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
|
||||
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||
|
||||
srv.be = b.storage.backend.be
|
||||
srv.beHooks = b.storage.backend.beHooks
|
||||
@ -1860,14 +1857,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
rp := &r
|
||||
pbutil.MustUnmarshal(rp, e.Data)
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
|
||||
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
|
||||
s.applyV2Request((*RequestV2)(rp), shouldApplyV3)
|
||||
s.w.Trigger(r.ID, Response{})
|
||||
return
|
||||
}
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
||||
|
||||
if raftReq.V2 != nil {
|
||||
req := (*RequestV2)(raftReq.V2)
|
||||
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
|
||||
s.applyV2Request(req, shouldApplyV3)
|
||||
s.w.Trigger(req.ID, Response{})
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,6 @@ func TestApplyRepeat(t *testing.T) {
|
||||
consistIndex: cindex.NewFakeConsistentIndex(0),
|
||||
uberApply: uberApplierMock{},
|
||||
}
|
||||
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
|
||||
s.start()
|
||||
req := &pb.InternalRaftRequest{
|
||||
Header: &pb.RequestHeader{ID: 1},
|
||||
@ -159,7 +158,6 @@ func TestV2SetMemberAttributes(t *testing.T) {
|
||||
v2store: mockstore.NewRecorder(),
|
||||
cluster: cl,
|
||||
}
|
||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
@ -187,7 +185,6 @@ func TestV2SetClusterVersion(t *testing.T) {
|
||||
v2store: mockstore.NewRecorder(),
|
||||
cluster: cl,
|
||||
}
|
||||
srv.applyV2 = NewApplierV2(srv.lg, srv.v2store, srv.cluster)
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
@ -580,7 +577,6 @@ func TestSnapshotOrdering(t *testing.T) {
|
||||
consistIndex: ci,
|
||||
beHooks: serverstorage.NewBackendHooks(lg, ci),
|
||||
}
|
||||
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
|
||||
|
||||
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
|
||||
s.be = be
|
||||
@ -675,7 +671,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||
uberApply: uberApplierMock{},
|
||||
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1),
|
||||
}
|
||||
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
|
||||
|
||||
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
|
||||
s.be = be
|
||||
|
Loading…
x
Reference in New Issue
Block a user