mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14149 from lavacat/main-txn-panic
server: don't panic in readonly serializable txn
This commit is contained in:
commit
fff5d00ccf
@ -17,6 +17,7 @@ package txn
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
@ -265,7 +266,18 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
txnWrite.End()
|
||||
txnWrite = kv.Write(trace)
|
||||
}
|
||||
applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
|
||||
_, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
|
||||
if err != nil {
|
||||
if isWrite {
|
||||
// end txn to release locks before panic
|
||||
txnWrite.End()
|
||||
// When txn with write operations starts it has to be successful
|
||||
// We don't have a way to recover state in case of write failure
|
||||
lg.Panic("unexpected error during txn with writes", zap.Error(err))
|
||||
} else {
|
||||
lg.Error("unexpected error during readonly txn", zap.Error(err))
|
||||
}
|
||||
}
|
||||
rev := txnWrite.Rev()
|
||||
if len(txnWrite.Changes()) != 0 {
|
||||
rev++
|
||||
@ -277,7 +289,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
|
||||
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
|
||||
)
|
||||
return txnResp, trace, nil
|
||||
return txnResp, trace, err
|
||||
}
|
||||
|
||||
// newTxnResp allocates a txn response for a txn request given a path.
|
||||
@ -311,7 +323,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn
|
||||
return txnResp, txnCount
|
||||
}
|
||||
|
||||
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
|
||||
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
|
||||
trace := traceutil.Get(ctx)
|
||||
reqs := rt.Success
|
||||
if !txnPath[0] {
|
||||
@ -328,7 +340,7 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
|
||||
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
|
||||
resp, err := Range(ctx, lg, kv, txnWrite, tv.RequestRange)
|
||||
if err != nil {
|
||||
lg.Panic("unexpected error during txnWrite", zap.Error(err))
|
||||
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
|
||||
}
|
||||
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
|
||||
trace.StopSubTrace()
|
||||
@ -339,26 +351,30 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
|
||||
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
|
||||
resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut)
|
||||
if err != nil {
|
||||
lg.Panic("unexpected error during txnWrite", zap.Error(err))
|
||||
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
|
||||
}
|
||||
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
|
||||
trace.StopSubTrace()
|
||||
case *pb.RequestOp_RequestDeleteRange:
|
||||
resp, err := DeleteRange(kv, txnWrite, tv.RequestDeleteRange)
|
||||
if err != nil {
|
||||
lg.Panic("unexpected error during txnWrite", zap.Error(err))
|
||||
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
|
||||
}
|
||||
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
|
||||
case *pb.RequestOp_RequestTxn:
|
||||
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
|
||||
applyTxns := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp)
|
||||
applyTxns, err := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp)
|
||||
if err != nil {
|
||||
// don't wrap the error. It's a recursive call and err should be already wrapped
|
||||
return 0, err
|
||||
}
|
||||
txns += applyTxns + 1
|
||||
txnPath = txnPath[applyTxns+1:]
|
||||
default:
|
||||
// empty union
|
||||
}
|
||||
}
|
||||
return txns
|
||||
return txns, nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------
|
||||
|
81
server/etcdserver/txn/txn_test.go
Normal file
81
server/etcdserver/txn/txn_test.go
Normal file
@ -0,0 +1,81 @@
|
||||
package txn
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestReadonlyTxnError(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, b)
|
||||
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
|
||||
defer s.Close()
|
||||
|
||||
// setup cancelled context
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
cancel()
|
||||
|
||||
// put some data to prevent early termination in rangeKeys
|
||||
// we are expecting failure on cancelled context check
|
||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||
|
||||
txn := &pb.TxnRequest{
|
||||
Success: []*pb.RequestOp{
|
||||
{
|
||||
Request: &pb.RequestOp_RequestRange{
|
||||
RequestRange: &pb.RangeRequest{
|
||||
Key: []byte("foo"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
|
||||
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
|
||||
t.Fatalf("Expected context canceled error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteTxnPanic(t *testing.T) {
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, b)
|
||||
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
|
||||
defer s.Close()
|
||||
|
||||
// setup cancelled context
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
cancel()
|
||||
|
||||
// write txn that puts some data and then fails in range due to cancelled context
|
||||
txn := &pb.TxnRequest{
|
||||
Success: []*pb.RequestOp{
|
||||
{
|
||||
Request: &pb.RequestOp_RequestPut{
|
||||
RequestPut: &pb.PutRequest{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Request: &pb.RequestOp_RequestRange{
|
||||
RequestRange: &pb.RangeRequest{
|
||||
Key: []byte("foo"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Panics(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
|
||||
}
|
@ -16,6 +16,7 @@ package mvcc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
@ -94,7 +95,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
|
||||
for i, revpair := range revpairs[:len(kvs)] {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
revToBytes(revpair, revBytes)
|
||||
|
Loading…
x
Reference in New Issue
Block a user