mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16373 from serathius/unify-arguments
server: Unify arguments for mvcc methods
This commit is contained in:
commit
9a6eab2d72
@ -72,8 +72,8 @@ type applierV3 interface {
|
||||
Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result
|
||||
|
||||
Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
||||
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||
DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
||||
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
|
||||
DeleteRange(ctx context.Context, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
|
||||
Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
|
||||
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
|
||||
|
||||
@ -161,11 +161,11 @@ func (a *applierV3backend) Put(ctx context.Context, p *pb.PutRequest) (resp *pb.
|
||||
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, p)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
return mvcctxn.DeleteRange(a.kv, dr)
|
||||
func (a *applierV3backend) DeleteRange(ctx context.Context, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
|
||||
return mvcctxn.DeleteRange(ctx, a.lg, a.kv, dr)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
func (a *applierV3backend) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
|
||||
return mvcctxn.Range(ctx, a.lg, a.kv, r)
|
||||
}
|
||||
|
||||
|
@ -86,25 +86,25 @@ func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResp
|
||||
return aa.applierV3.Put(ctx, r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
func (aa *authApplierV3) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
|
||||
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return aa.applierV3.Range(ctx, r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) DeleteRange(r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
func (aa *authApplierV3) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
|
||||
if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if r.PrevKv {
|
||||
err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return aa.applierV3.DeleteRange(r)
|
||||
return aa.applierV3.DeleteRange(ctx, r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
||||
|
@ -532,7 +532,7 @@ func TestAuthApplierV3_Range(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
setAuthInfo(authApplier, tc.userName)
|
||||
_, err := authApplier.Range(ctx, tc.request)
|
||||
_, _, err := authApplier.Range(ctx, tc.request)
|
||||
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
|
||||
})
|
||||
}
|
||||
@ -596,7 +596,7 @@ func TestAuthApplierV3_DeleteRange(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
setAuthInfo(authApplier, tc.userName)
|
||||
_, err := authApplier.DeleteRange(tc.request)
|
||||
_, _, err := authApplier.DeleteRange(context.Background(), tc.request)
|
||||
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
|
||||
})
|
||||
}
|
||||
|
@ -32,12 +32,12 @@ func (a *applierV3Corrupt) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResp
|
||||
return nil, nil, errors.ErrCorrupt
|
||||
}
|
||||
|
||||
func (a *applierV3Corrupt) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return nil, errors.ErrCorrupt
|
||||
func (a *applierV3Corrupt) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
|
||||
return nil, nil, errors.ErrCorrupt
|
||||
}
|
||||
|
||||
func (a *applierV3Corrupt) DeleteRange(_ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
return nil, errors.ErrCorrupt
|
||||
func (a *applierV3Corrupt) DeleteRange(_ context.Context, _ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
|
||||
return nil, nil, errors.ErrCorrupt
|
||||
}
|
||||
|
||||
func (a *applierV3Corrupt) Txn(_ context.Context, _ *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
||||
|
@ -153,13 +153,13 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
|
||||
switch {
|
||||
case r.Range != nil:
|
||||
op = "Range"
|
||||
ar.Resp, ar.Err = a.applyV3.Range(ctx, r.Range)
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(ctx, r.Range)
|
||||
case r.Put != nil:
|
||||
op = "Put"
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, r.Put)
|
||||
case r.DeleteRange != nil:
|
||||
op = "DeleteRange"
|
||||
ar.Resp, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(ctx, r.DeleteRange)
|
||||
case r.Txn != nil:
|
||||
op = "Txn"
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(ctx, r.Txn)
|
||||
|
@ -93,20 +93,30 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func DeleteRange(kv mvcc.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
txnWrite := kv.Write(traceutil.TODO())
|
||||
func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
|
||||
trace = traceutil.Get(ctx)
|
||||
// create delete tracing if the trace in context is empty
|
||||
if trace.IsEmpty() {
|
||||
trace = traceutil.New("delete_range",
|
||||
lg,
|
||||
traceutil.Field{Key: "key", Value: string(dr.Key)},
|
||||
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
|
||||
)
|
||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||
}
|
||||
txnWrite := kv.Write(trace)
|
||||
defer txnWrite.End()
|
||||
resp, err := deleteRange(txnWrite, dr)
|
||||
return resp, err
|
||||
resp, err = deleteRange(ctx, txnWrite, dr)
|
||||
return resp, trace, err
|
||||
}
|
||||
|
||||
func deleteRange(txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
resp := &pb.DeleteRangeResponse{}
|
||||
resp.Header = &pb.ResponseHeader{}
|
||||
end := mkGteRange(dr.RangeEnd)
|
||||
|
||||
if dr.PrevKv {
|
||||
rr, err := txnWrite.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
|
||||
rr, err := txnWrite.Range(ctx, dr.Key, end, mvcc.RangeOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -122,15 +132,16 @@ func deleteRange(txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteR
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
trace := traceutil.Get(ctx)
|
||||
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
|
||||
trace = traceutil.Get(ctx)
|
||||
if trace.IsEmpty() {
|
||||
trace = traceutil.New("range", lg)
|
||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||
}
|
||||
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
|
||||
defer txnRead.End()
|
||||
return executeRange(ctx, lg, txnRead, r)
|
||||
resp, err = executeRange(ctx, lg, txnRead, r)
|
||||
return resp, trace, err
|
||||
}
|
||||
|
||||
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
@ -384,7 +395,7 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
|
||||
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
|
||||
trace.StopSubTrace()
|
||||
case *pb.RequestOp_RequestDeleteRange:
|
||||
resp, err := deleteRange(txnWrite, tv.RequestDeleteRange)
|
||||
resp, err := deleteRange(ctx, txnWrite, tv.RequestDeleteRange)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||
}
|
||||
|
||||
get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
|
||||
get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
err = serr
|
||||
return nil, err
|
||||
|
Loading…
x
Reference in New Issue
Block a user