mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: add --max-request-bytes flag
This commit is contained in:
parent
b003734be6
commit
9e7740011b
@ -41,6 +41,7 @@ const (
|
|||||||
DefaultMaxSnapshots = 5
|
DefaultMaxSnapshots = 5
|
||||||
DefaultMaxWALs = 5
|
DefaultMaxWALs = 5
|
||||||
DefaultMaxTxnOps = uint(128)
|
DefaultMaxTxnOps = uint(128)
|
||||||
|
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
|
||||||
|
|
||||||
DefaultListenPeerURLs = "http://localhost:2380"
|
DefaultListenPeerURLs = "http://localhost:2380"
|
||||||
DefaultListenClientURLs = "http://localhost:2379"
|
DefaultListenClientURLs = "http://localhost:2379"
|
||||||
@ -87,6 +88,7 @@ type Config struct {
|
|||||||
ElectionMs uint `json:"election-timeout"`
|
ElectionMs uint `json:"election-timeout"`
|
||||||
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
|
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
|
||||||
MaxTxnOps uint `json:"max-txn-ops"`
|
MaxTxnOps uint `json:"max-txn-ops"`
|
||||||
|
MaxRequestBytes uint `json:"max-request-bytes"`
|
||||||
|
|
||||||
// clustering
|
// clustering
|
||||||
|
|
||||||
@ -175,6 +177,7 @@ func NewConfig() *Config {
|
|||||||
Name: DefaultName,
|
Name: DefaultName,
|
||||||
SnapCount: etcdserver.DefaultSnapCount,
|
SnapCount: etcdserver.DefaultSnapCount,
|
||||||
MaxTxnOps: DefaultMaxTxnOps,
|
MaxTxnOps: DefaultMaxTxnOps,
|
||||||
|
MaxRequestBytes: DefaultMaxRequestBytes,
|
||||||
TickMs: 100,
|
TickMs: 100,
|
||||||
ElectionMs: 1000,
|
ElectionMs: 1000,
|
||||||
LPUrls: []url.URL{*lpurl},
|
LPUrls: []url.URL{*lpurl},
|
||||||
|
@ -140,6 +140,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
AutoCompactionRetention: cfg.AutoCompactionRetention,
|
AutoCompactionRetention: cfg.AutoCompactionRetention,
|
||||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
MaxTxnOps: cfg.MaxTxnOps,
|
MaxTxnOps: cfg.MaxTxnOps,
|
||||||
|
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||||
AuthToken: cfg.AuthToken,
|
AuthToken: cfg.AuthToken,
|
||||||
|
@ -139,6 +139,7 @@ func newConfig() *config {
|
|||||||
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
|
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
|
||||||
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
|
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
|
||||||
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
|
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
|
||||||
|
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
|
||||||
|
|
||||||
// clustering
|
// clustering
|
||||||
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
|
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
|
||||||
|
@ -68,6 +68,8 @@ member flags:
|
|||||||
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
|
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
|
||||||
--max-txn-ops '128'
|
--max-txn-ops '128'
|
||||||
maximum number of operations permitted in a transaction.
|
maximum number of operations permitted in a transaction.
|
||||||
|
--max-request-bytes '1572864'
|
||||||
|
maximum client request size in bytes the server will accept.
|
||||||
|
|
||||||
clustering flags:
|
clustering flags:
|
||||||
|
|
||||||
|
@ -56,6 +56,9 @@ type ServerConfig struct {
|
|||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
MaxTxnOps uint
|
MaxTxnOps uint
|
||||||
|
|
||||||
|
// MaxRequestBytes is the maximum request size to send over raft.
|
||||||
|
MaxRequestBytes uint
|
||||||
|
|
||||||
StrictReconfigCheck bool
|
StrictReconfigCheck bool
|
||||||
|
|
||||||
// ClientCertAuthEnabled is true when cert has been signed by the client CA.
|
// ClientCertAuthEnabled is true when cert has been signed by the client CA.
|
||||||
|
@ -83,6 +83,8 @@ const (
|
|||||||
|
|
||||||
// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
|
// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
|
||||||
maxPendingRevokes = 16
|
maxPendingRevokes = 16
|
||||||
|
|
||||||
|
recommendedMaxRequestBytes = 10 * 1024 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -259,6 +261,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
cl *membership.RaftCluster
|
cl *membership.RaftCluster
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
|
||||||
|
plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
|
||||||
|
}
|
||||||
|
|
||||||
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
||||||
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
||||||
}
|
}
|
||||||
|
@ -31,12 +31,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// the max request size that raft accepts.
|
|
||||||
// TODO: make this a flag? But we probably do not want to
|
|
||||||
// accept large request which might block raft stream. User
|
|
||||||
// specify a large value might end up with shooting in the foot.
|
|
||||||
maxRequestBytes = 1.5 * 1024 * 1024
|
|
||||||
|
|
||||||
// In the health case, there might be a small gap (10s of entries) between
|
// In the health case, there might be a small gap (10s of entries) between
|
||||||
// the applied index and committed index.
|
// the applied index and committed index.
|
||||||
// However, if the committed entries are very heavy to apply, the gap might grow.
|
// However, if the committed entries are very heavy to apply, the gap might grow.
|
||||||
@ -605,7 +599,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(data) > maxRequestBytes {
|
if len(data) > int(s.Cfg.MaxRequestBytes) {
|
||||||
return nil, ErrRequestTooLarge
|
return nil, ErrRequestTooLarge
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,6 +61,8 @@ const (
|
|||||||
basePort = 21000
|
basePort = 21000
|
||||||
UrlScheme = "unix"
|
UrlScheme = "unix"
|
||||||
UrlSchemeTLS = "unixs"
|
UrlSchemeTLS = "unixs"
|
||||||
|
|
||||||
|
defaultMaxRequestSize
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -95,6 +97,7 @@ type ClusterConfig struct {
|
|||||||
UseGRPC bool
|
UseGRPC bool
|
||||||
QuotaBackendBytes int64
|
QuotaBackendBytes int64
|
||||||
MaxTxnOps uint
|
MaxTxnOps uint
|
||||||
|
MaxRequestBytes uint
|
||||||
}
|
}
|
||||||
|
|
||||||
type cluster struct {
|
type cluster struct {
|
||||||
@ -227,6 +230,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
|
|||||||
clientTLS: c.cfg.ClientTLS,
|
clientTLS: c.cfg.ClientTLS,
|
||||||
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
quotaBackendBytes: c.cfg.QuotaBackendBytes,
|
||||||
maxTxnOps: c.cfg.MaxTxnOps,
|
maxTxnOps: c.cfg.MaxTxnOps,
|
||||||
|
maxRequestBytes: c.cfg.MaxRequestBytes,
|
||||||
})
|
})
|
||||||
m.DiscoveryURL = c.cfg.DiscoveryURL
|
m.DiscoveryURL = c.cfg.DiscoveryURL
|
||||||
if c.cfg.UseGRPC {
|
if c.cfg.UseGRPC {
|
||||||
@ -494,6 +498,7 @@ type memberConfig struct {
|
|||||||
clientTLS *transport.TLSInfo
|
clientTLS *transport.TLSInfo
|
||||||
quotaBackendBytes int64
|
quotaBackendBytes int64
|
||||||
maxTxnOps uint
|
maxTxnOps uint
|
||||||
|
maxRequestBytes uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustNewMember return an inited member with the given name. If peerTLS is
|
// mustNewMember return an inited member with the given name. If peerTLS is
|
||||||
@ -545,6 +550,10 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
|
|||||||
if m.MaxTxnOps == 0 {
|
if m.MaxTxnOps == 0 {
|
||||||
m.MaxTxnOps = embed.DefaultMaxTxnOps
|
m.MaxTxnOps = embed.DefaultMaxTxnOps
|
||||||
}
|
}
|
||||||
|
m.MaxRequestBytes = mcfg.maxRequestBytes
|
||||||
|
if m.MaxRequestBytes == 0 {
|
||||||
|
m.MaxRequestBytes = embed.DefaultMaxRequestBytes
|
||||||
|
}
|
||||||
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
|
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
@ -1625,6 +1625,35 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3PutLargeRequests ensures that configurable MaxRequestBytes works as intended.
|
||||||
|
func TestV3PutLargeRequests(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
tests := []struct {
|
||||||
|
key string
|
||||||
|
maxRequestBytes uint
|
||||||
|
valueSize int
|
||||||
|
expectError error
|
||||||
|
}{
|
||||||
|
// don't set to 0. use 0 as the default.
|
||||||
|
{"foo", 1, 1024, rpctypes.ErrGRPCRequestTooLarge},
|
||||||
|
{"foo", 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
|
||||||
|
{"foo", 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
|
||||||
|
{"foo", 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
|
||||||
|
}
|
||||||
|
for i, test := range tests {
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
|
||||||
|
kvcli := toGRPC(clus.Client(0)).KV
|
||||||
|
reqput := &pb.PutRequest{Key: []byte(test.key), Value: make([]byte, test.valueSize)}
|
||||||
|
_, err := kvcli.Put(context.TODO(), reqput)
|
||||||
|
|
||||||
|
if !eqErrGRPC(err, test.expectError) {
|
||||||
|
t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clus.Terminate(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func eqErrGRPC(err1 error, err2 error) bool {
|
func eqErrGRPC(err1 error, err2 error) bool {
|
||||||
return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
|
return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user