mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #7976 from fanminshi/make_maxOpsPerTxn_configurable
etcdserver: add --max-txn-ops flag
This commit is contained in:
commit
b003734be6
@ -20,7 +20,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
"github.com/coreos/etcd/embed"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
"github.com/coreos/etcd/integration"
|
"github.com/coreos/etcd/integration"
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
@ -41,7 +41,7 @@ func TestTxnError(t *testing.T) {
|
|||||||
t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err)
|
t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ops := make([]clientv3.Op, v3rpc.MaxOpsPerTxn+10)
|
ops := make([]clientv3.Op, int(embed.DefaultMaxTxnOps+10))
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "")
|
ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "")
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ const (
|
|||||||
DefaultName = "default"
|
DefaultName = "default"
|
||||||
DefaultMaxSnapshots = 5
|
DefaultMaxSnapshots = 5
|
||||||
DefaultMaxWALs = 5
|
DefaultMaxWALs = 5
|
||||||
|
DefaultMaxTxnOps = uint(128)
|
||||||
|
|
||||||
DefaultListenPeerURLs = "http://localhost:2380"
|
DefaultListenPeerURLs = "http://localhost:2380"
|
||||||
DefaultListenClientURLs = "http://localhost:2379"
|
DefaultListenClientURLs = "http://localhost:2379"
|
||||||
@ -85,6 +86,7 @@ type Config struct {
|
|||||||
TickMs uint `json:"heartbeat-interval"`
|
TickMs uint `json:"heartbeat-interval"`
|
||||||
ElectionMs uint `json:"election-timeout"`
|
ElectionMs uint `json:"election-timeout"`
|
||||||
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
|
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
|
||||||
|
MaxTxnOps uint `json:"max-txn-ops"`
|
||||||
|
|
||||||
// clustering
|
// clustering
|
||||||
|
|
||||||
@ -172,6 +174,7 @@ func NewConfig() *Config {
|
|||||||
MaxWalFiles: DefaultMaxWALs,
|
MaxWalFiles: DefaultMaxWALs,
|
||||||
Name: DefaultName,
|
Name: DefaultName,
|
||||||
SnapCount: etcdserver.DefaultSnapCount,
|
SnapCount: etcdserver.DefaultSnapCount,
|
||||||
|
MaxTxnOps: DefaultMaxTxnOps,
|
||||||
TickMs: 100,
|
TickMs: 100,
|
||||||
ElectionMs: 1000,
|
ElectionMs: 1000,
|
||||||
LPUrls: []url.URL{*lpurl},
|
LPUrls: []url.URL{*lpurl},
|
||||||
|
@ -139,6 +139,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
ElectionTicks: cfg.ElectionTicks(),
|
ElectionTicks: cfg.ElectionTicks(),
|
||||||
AutoCompactionRetention: cfg.AutoCompactionRetention,
|
AutoCompactionRetention: cfg.AutoCompactionRetention,
|
||||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
|
MaxTxnOps: cfg.MaxTxnOps,
|
||||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||||
AuthToken: cfg.AuthToken,
|
AuthToken: cfg.AuthToken,
|
||||||
|
@ -138,6 +138,7 @@ func newConfig() *config {
|
|||||||
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
|
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
|
||||||
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
|
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
|
||||||
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
|
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
|
||||||
|
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
|
||||||
|
|
||||||
// clustering
|
// clustering
|
||||||
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
|
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
|
||||||
|
@ -66,6 +66,8 @@ member flags:
|
|||||||
comma-separated whitelist of origins for CORS (cross-origin resource sharing).
|
comma-separated whitelist of origins for CORS (cross-origin resource sharing).
|
||||||
--quota-backend-bytes '0'
|
--quota-backend-bytes '0'
|
||||||
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
|
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
|
||||||
|
--max-txn-ops '128'
|
||||||
|
maximum number of operations permitted in a transaction.
|
||||||
|
|
||||||
clustering flags:
|
clustering flags:
|
||||||
|
|
||||||
|
@ -27,19 +27,20 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver/api/v3rpc")
|
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver/api/v3rpc")
|
||||||
|
|
||||||
// Max operations per txn list. For example, Txn.Success can have at most 128 operations,
|
|
||||||
// and Txn.Failure can have at most 128 operations.
|
|
||||||
MaxOpsPerTxn = 128
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type kvServer struct {
|
type kvServer struct {
|
||||||
hdr header
|
hdr header
|
||||||
kv etcdserver.RaftKV
|
kv etcdserver.RaftKV
|
||||||
|
// maxTxnOps is the max operations per txn.
|
||||||
|
// e.g suppose maxTxnOps = 128.
|
||||||
|
// Txn.Success can have at most 128 operations,
|
||||||
|
// and Txn.Failure can have at most 128 operations.
|
||||||
|
maxTxnOps uint
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
|
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
|
||||||
return &kvServer{hdr: newHeader(s), kv: s}
|
return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
@ -94,7 +95,7 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
if err := checkTxnRequest(r); err != nil {
|
if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,8 +151,8 @@ func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkTxnRequest(r *pb.TxnRequest) error {
|
func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
|
||||||
if len(r.Compare) > MaxOpsPerTxn || len(r.Success) > MaxOpsPerTxn || len(r.Failure) > MaxOpsPerTxn {
|
if len(r.Compare) > maxTxnOps || len(r.Success) > maxTxnOps || len(r.Failure) > maxTxnOps {
|
||||||
return rpctypes.ErrGRPCTooManyOps
|
return rpctypes.ErrGRPCTooManyOps
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@ type ServerConfig struct {
|
|||||||
|
|
||||||
AutoCompactionRetention int
|
AutoCompactionRetention int
|
||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
|
MaxTxnOps uint
|
||||||
|
|
||||||
StrictReconfigCheck bool
|
StrictReconfigCheck bool
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/client"
|
"github.com/coreos/etcd/client"
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
"github.com/coreos/etcd/embed"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3client"
|
"github.com/coreos/etcd/etcdserver/api/v3client"
|
||||||
@ -93,6 +94,7 @@ type ClusterConfig struct {
|
|||||||
DiscoveryURL string
|
DiscoveryURL string
|
||||||
UseGRPC bool
|
UseGRPC bool
|
||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
|
MaxTxnOps uint
|
||||||
}
|
}
|
||||||
|
|
||||||
type cluster struct {
|
type cluster struct {
|
||||||
@ -224,6 +226,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
|
|||||||
peerTLS: c.cfg.PeerTLS,
|
peerTLS: c.cfg.PeerTLS,
|
||||||
clientTLS: c.cfg.ClientTLS,
|
clientTLS: c.cfg.ClientTLS,
|
||||||
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
||||||
|
maxTxnOps: c.cfg.MaxTxnOps,
|
||||||
})
|
})
|
||||||
m.DiscoveryURL = c.cfg.DiscoveryURL
|
m.DiscoveryURL = c.cfg.DiscoveryURL
|
||||||
if c.cfg.UseGRPC {
|
if c.cfg.UseGRPC {
|
||||||
@ -490,6 +493,7 @@ type memberConfig struct {
|
|||||||
peerTLS *transport.TLSInfo
|
peerTLS *transport.TLSInfo
|
||||||
clientTLS *transport.TLSInfo
|
clientTLS *transport.TLSInfo
|
||||||
quotaBackendBytes int64
|
quotaBackendBytes int64
|
||||||
|
maxTxnOps uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustNewMember return an inited member with the given name. If peerTLS is
|
// mustNewMember return an inited member with the given name. If peerTLS is
|
||||||
@ -537,6 +541,10 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
|
|||||||
m.ElectionTicks = electionTicks
|
m.ElectionTicks = electionTicks
|
||||||
m.TickMs = uint(tickDuration / time.Millisecond)
|
m.TickMs = uint(tickDuration / time.Millisecond)
|
||||||
m.QuotaBackendBytes = mcfg.quotaBackendBytes
|
m.QuotaBackendBytes = mcfg.quotaBackendBytes
|
||||||
|
m.MaxTxnOps = mcfg.maxTxnOps
|
||||||
|
if m.MaxTxnOps == 0 {
|
||||||
|
m.MaxTxnOps = embed.DefaultMaxTxnOps
|
||||||
|
}
|
||||||
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
|
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
@ -150,7 +149,8 @@ func TestV3CompactCurrentRev(t *testing.T) {
|
|||||||
|
|
||||||
func TestV3TxnTooManyOps(t *testing.T) {
|
func TestV3TxnTooManyOps(t *testing.T) {
|
||||||
defer testutil.AfterTest(t)
|
defer testutil.AfterTest(t)
|
||||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
maxTxnOps := uint(128)
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
kvc := toGRPC(clus.RandClient()).KV
|
kvc := toGRPC(clus.RandClient()).KV
|
||||||
@ -201,7 +201,7 @@ func TestV3TxnTooManyOps(t *testing.T) {
|
|||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
txn := &pb.TxnRequest{}
|
txn := &pb.TxnRequest{}
|
||||||
for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ {
|
for j := 0; j < int(maxTxnOps+1); j++ {
|
||||||
tt(txn)
|
tt(txn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user