mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
applyV2 should reapply on backend only once
During review of: https://github.com/etcd-io/etcd/pull/12988 spotted that PUT is actially writing to v3-backend. If we are replaying WAL log, it might happened that backend's applied_index is > than the WAL's log entry. In such situation we should skip applying on backend V3. I think both the methods (setVersion, setMembersAttributes) are in practice idempotent so its not that 'serious' problem, but for formal correctness adding the proper checks.
This commit is contained in:
parent
379c361bc6
commit
c4ebac0c57
@ -36,7 +36,7 @@ const v2Version = "v2"
|
|||||||
type ApplierV2 interface {
|
type ApplierV2 interface {
|
||||||
Delete(r *RequestV2) Response
|
Delete(r *RequestV2) Response
|
||||||
Post(r *RequestV2) Response
|
Post(r *RequestV2) Response
|
||||||
Put(r *RequestV2) Response
|
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
|
||||||
QGet(r *RequestV2) Response
|
QGet(r *RequestV2) Response
|
||||||
Sync(r *RequestV2) Response
|
Sync(r *RequestV2) Response
|
||||||
}
|
}
|
||||||
@ -67,7 +67,7 @@ func (a *applierV2store) Post(r *RequestV2) Response {
|
|||||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
|
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV2store) Put(r *RequestV2) Response {
|
func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
|
||||||
ttlOptions := r.TTLOptions()
|
ttlOptions := r.TTLOptions()
|
||||||
exists, existsSet := pbutil.GetBool(r.PrevExist)
|
exists, existsSet := pbutil.GetBool(r.PrevExist)
|
||||||
switch {
|
switch {
|
||||||
@ -89,7 +89,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
|
|||||||
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||||
}
|
}
|
||||||
if a.cluster != nil {
|
if a.cluster != nil {
|
||||||
a.cluster.UpdateAttributes(id, attr, true)
|
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
|
||||||
}
|
}
|
||||||
// return an empty response since there is no consumer.
|
// return an empty response since there is no consumer.
|
||||||
return Response{}
|
return Response{}
|
||||||
@ -98,7 +98,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
|
|||||||
if r.Path == membership.StoreClusterVersionKey() {
|
if r.Path == membership.StoreClusterVersionKey() {
|
||||||
if a.cluster != nil {
|
if a.cluster != nil {
|
||||||
// persist to backend given v2store can be very stale
|
// persist to backend given v2store can be very stale
|
||||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth)
|
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
|
||||||
}
|
}
|
||||||
return Response{}
|
return Response{}
|
||||||
}
|
}
|
||||||
@ -117,7 +117,7 @@ func (a *applierV2store) Sync(r *RequestV2) Response {
|
|||||||
|
|
||||||
// applyV2Request interprets r as a call to v2store.X
|
// applyV2Request interprets r as a call to v2store.X
|
||||||
// and returns a Response interpreted from v2store.Event
|
// and returns a Response interpreted from v2store.Event
|
||||||
func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
|
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
|
||||||
stringer := panicAlternativeStringer{
|
stringer := panicAlternativeStringer{
|
||||||
stringer: r,
|
stringer: r,
|
||||||
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
|
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
|
||||||
@ -132,7 +132,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
|
|||||||
case "POST":
|
case "POST":
|
||||||
return s.applyV2.Post(r)
|
return s.applyV2.Post(r)
|
||||||
case "PUT":
|
case "PUT":
|
||||||
return s.applyV2.Put(r)
|
return s.applyV2.Put(r, shouldApplyV3)
|
||||||
case "DELETE":
|
case "DELETE":
|
||||||
return s.applyV2.Delete(r)
|
return s.applyV2.Delete(r)
|
||||||
case "QGET":
|
case "QGET":
|
||||||
|
@ -2181,14 +2181,14 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|||||||
rp := &r
|
rp := &r
|
||||||
pbutil.MustUnmarshal(rp, e.Data)
|
pbutil.MustUnmarshal(rp, e.Data)
|
||||||
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
|
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
|
||||||
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
|
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
||||||
|
|
||||||
if raftReq.V2 != nil {
|
if raftReq.V2 != nil {
|
||||||
req := (*RequestV2)(raftReq.V2)
|
req := (*RequestV2)(raftReq.V2)
|
||||||
s.w.Trigger(req.ID, s.applyV2Request(req))
|
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,7 +472,7 @@ func TestApplyRequest(t *testing.T) {
|
|||||||
v2store: st,
|
v2store: st,
|
||||||
}
|
}
|
||||||
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
|
||||||
resp := srv.applyV2Request((*RequestV2)(&tt.req))
|
resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth)
|
||||||
|
|
||||||
if !reflect.DeepEqual(resp, tt.wresp) {
|
if !reflect.DeepEqual(resp, tt.wresp) {
|
||||||
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
|
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
|
||||||
@ -500,7 +500,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|||||||
Path: membership.MemberAttributesStorePath(1),
|
Path: membership.MemberAttributesStorePath(1),
|
||||||
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
|
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
|
||||||
}
|
}
|
||||||
srv.applyV2Request((*RequestV2)(&req))
|
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
|
||||||
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
|
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
|
||||||
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
|
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("attributes = %v, want %v", g, w)
|
t.Errorf("attributes = %v, want %v", g, w)
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
|
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -52,7 +53,7 @@ func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
|
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
|
||||||
return a.applier.Put(r), nil
|
return a.applier.Put(r, membership.ApplyBoth), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
|
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user