mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[backport 3.5] server: don't panic in readonly serializable txn
Problem: We pass grpc context down to applier in readonly serializable txn. This context can be cancelled for example due to timeout. This will trigger panic inside applyTxn Solution: Only panic for transactions with write operations fixes https://github.com/etcd-io/etcd/issues/14110 main PR https://github.com/etcd-io/etcd/pull/14149 Signed-off-by: Bogdan Kanivets <bkanivets@apple.com>
This commit is contained in:
@@ -428,6 +428,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
||||||
|
lg := a.s.Logger()
|
||||||
trace := traceutil.Get(ctx)
|
trace := traceutil.Get(ctx)
|
||||||
if trace.IsEmpty() {
|
if trace.IsEmpty() {
|
||||||
trace = traceutil.New("transaction", a.s.Logger())
|
trace = traceutil.New("transaction", a.s.Logger())
|
||||||
@@ -474,7 +475,18 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
|
|||||||
txn.End()
|
txn.End()
|
||||||
txn = a.s.KV().Write(trace)
|
txn = a.s.KV().Write(trace)
|
||||||
}
|
}
|
||||||
a.applyTxn(ctx, txn, rt, txnPath, txnResp)
|
_, err := a.applyTxn(ctx, txn, rt, txnPath, txnResp)
|
||||||
|
if err != nil {
|
||||||
|
if isWrite {
|
||||||
|
// end txn to release locks before panic
|
||||||
|
txn.End()
|
||||||
|
// When txn with write operations starts it has to be successful
|
||||||
|
// We don't have a way to recover state in case of write failure
|
||||||
|
lg.Panic("unexpected error during txn with writes", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
lg.Error("unexpected error during readonly txn", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
rev := txn.Rev()
|
rev := txn.Rev()
|
||||||
if len(txn.Changes()) != 0 {
|
if len(txn.Changes()) != 0 {
|
||||||
rev++
|
rev++
|
||||||
@@ -486,7 +498,7 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
|
|||||||
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
|
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
|
||||||
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
|
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
|
||||||
)
|
)
|
||||||
return txnResp, trace, nil
|
return txnResp, trace, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTxnResp allocates a txn response for a txn request given a path.
|
// newTxnResp allocates a txn response for a txn request given a path.
|
||||||
@@ -617,14 +629,13 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
|
func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
|
||||||
trace := traceutil.Get(ctx)
|
trace := traceutil.Get(ctx)
|
||||||
reqs := rt.Success
|
reqs := rt.Success
|
||||||
if !txnPath[0] {
|
if !txnPath[0] {
|
||||||
reqs = rt.Failure
|
reqs = rt.Failure
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := a.s.Logger()
|
|
||||||
for i, req := range reqs {
|
for i, req := range reqs {
|
||||||
respi := tresp.Responses[i].Response
|
respi := tresp.Responses[i].Response
|
||||||
switch tv := req.Request.(type) {
|
switch tv := req.Request.(type) {
|
||||||
@@ -635,7 +646,7 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *
|
|||||||
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
|
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
|
||||||
resp, err := a.Range(ctx, txn, tv.RequestRange)
|
resp, err := a.Range(ctx, txn, tv.RequestRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Panic("unexpected error during txn", zap.Error(err))
|
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
|
||||||
}
|
}
|
||||||
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
||||||
trace.StopSubTrace()
|
trace.StopSubTrace()
|
||||||
@@ -646,26 +657,30 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *
|
|||||||
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
|
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
|
||||||
resp, _, err := a.Put(ctx, txn, tv.RequestPut)
|
resp, _, err := a.Put(ctx, txn, tv.RequestPut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Panic("unexpected error during txn", zap.Error(err))
|
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
|
||||||
}
|
}
|
||||||
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
|
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
|
||||||
trace.StopSubTrace()
|
trace.StopSubTrace()
|
||||||
case *pb.RequestOp_RequestDeleteRange:
|
case *pb.RequestOp_RequestDeleteRange:
|
||||||
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
|
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Panic("unexpected error during txn", zap.Error(err))
|
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
|
||||||
}
|
}
|
||||||
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
|
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
|
||||||
case *pb.RequestOp_RequestTxn:
|
case *pb.RequestOp_RequestTxn:
|
||||||
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
|
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
|
||||||
applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
|
applyTxns, err := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
|
||||||
|
if err != nil {
|
||||||
|
// don't wrap the error. It's a recursive call and err should be already wrapped
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
txns += applyTxns + 1
|
txns += applyTxns + 1
|
||||||
txnPath = txnPath[applyTxns+1:]
|
txnPath = txnPath[applyTxns+1:]
|
||||||
default:
|
default:
|
||||||
// empty union
|
// empty union
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return txns
|
return txns, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
|
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
|
||||||
|
|||||||
96
server/etcdserver/apply_test.go
Normal file
96
server/etcdserver/apply_test.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
package etcdserver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
|
"go.etcd.io/etcd/server/v3/lease"
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc"
|
||||||
|
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReadonlyTxnError(t *testing.T) {
|
||||||
|
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||||
|
defer betesting.Close(t, b)
|
||||||
|
s := mvcc.New(zap.NewExample(), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
// setup minimal server to get access to applier
|
||||||
|
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeRecorder()})}
|
||||||
|
srv.kv = s
|
||||||
|
srv.be = b
|
||||||
|
|
||||||
|
a := srv.newApplierV3Backend()
|
||||||
|
|
||||||
|
// setup cancelled context
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// put some data to prevent early termination in rangeKeys
|
||||||
|
// we are expecting failure on cancelled context check
|
||||||
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
|
|
||||||
|
txn := &pb.TxnRequest{
|
||||||
|
Success: []*pb.RequestOp{
|
||||||
|
{
|
||||||
|
Request: &pb.RequestOp_RequestRange{
|
||||||
|
RequestRange: &pb.RangeRequest{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err := a.Txn(ctx, txn)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
|
||||||
|
t.Fatalf("Expected context canceled error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteTxnPanic(t *testing.T) {
|
||||||
|
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||||
|
defer betesting.Close(t, b)
|
||||||
|
s := mvcc.New(zap.NewExample(), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
// setup minimal server to get access to applier
|
||||||
|
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeRecorder()})}
|
||||||
|
srv.kv = s
|
||||||
|
srv.be = b
|
||||||
|
|
||||||
|
a := srv.newApplierV3Backend()
|
||||||
|
|
||||||
|
// setup cancelled context
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// write txn that puts some data and then fails in range due to cancelled context
|
||||||
|
txn := &pb.TxnRequest{
|
||||||
|
Success: []*pb.RequestOp{
|
||||||
|
{
|
||||||
|
Request: &pb.RequestOp_RequestPut{
|
||||||
|
RequestPut: &pb.PutRequest{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Request: &pb.RequestOp_RequestRange{
|
||||||
|
RequestRange: &pb.RangeRequest{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Panics(t, func() { a.Txn(ctx, txn) }, "Expected panic in Txn with writes")
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ package mvcc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||||
@@ -156,7 +157,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
|
|||||||
for i, revpair := range revpairs[:len(kvs)] {
|
for i, revpair := range revpairs[:len(kvs)] {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
revToBytes(revpair, revBytes)
|
revToBytes(revpair, revBytes)
|
||||||
|
|||||||
Reference in New Issue
Block a user