mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Fix 2 sources of leaked memory: embed server HTTP & v3_snapshot.leasser.
This commit is contained in:
@@ -355,14 +355,17 @@ func (s *v3Manager) saveDB() error {
|
|||||||
// update consistentIndex so applies go through on etcdserver despite
|
// update consistentIndex so applies go through on etcdserver despite
|
||||||
// having a new raft instance
|
// having a new raft instance
|
||||||
be := backend.NewDefaultBackend(dbpath)
|
be := backend.NewDefaultBackend(dbpath)
|
||||||
|
defer be.Close()
|
||||||
|
|
||||||
ci := cindex.NewConsistentIndex(be.BatchTx())
|
ci := cindex.NewConsistentIndex(be.BatchTx())
|
||||||
ci.SetConsistentIndex(uint64(commit))
|
ci.SetConsistentIndex(uint64(commit))
|
||||||
|
|
||||||
// a lessor never timeouts leases
|
// a lessor never timeouts leases
|
||||||
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)
|
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)
|
||||||
|
defer lessor.Stop()
|
||||||
|
|
||||||
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
||||||
|
defer mvs.Close()
|
||||||
txn := mvs.Write(traceutil.TODO())
|
txn := mvs.Write(traceutil.TODO())
|
||||||
btx := be.BatchTx()
|
btx := be.BatchTx()
|
||||||
del := func(k, v []byte) error {
|
del := func(k, v []byte) error {
|
||||||
@@ -380,9 +383,6 @@ func (s *v3Manager) saveDB() error {
|
|||||||
txn.End()
|
txn.End()
|
||||||
|
|
||||||
mvs.Commit()
|
mvs.Commit()
|
||||||
mvs.Close()
|
|
||||||
be.Close()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -386,18 +386,14 @@ func (e *Etcd) Close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func stopServers(ctx context.Context, ss *servers) {
|
func stopServers(ctx context.Context, ss *servers) {
|
||||||
shutdownNow := func() {
|
// first, close the http.Server
|
||||||
// first, close the http.Server
|
ss.http.Shutdown(ctx)
|
||||||
ss.http.Shutdown(ctx)
|
|
||||||
// then close grpc.Server; cancels all active RPCs
|
|
||||||
ss.grpc.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not grpc.Server.GracefulStop with TLS enabled etcd server
|
// do not grpc.Server.GracefulStop with TLS enabled etcd server
|
||||||
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
|
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
|
||||||
// and https://github.com/etcd-io/etcd/issues/8916
|
// and https://github.com/etcd-io/etcd/issues/8916
|
||||||
if ss.secure {
|
if ss.secure {
|
||||||
shutdownNow()
|
ss.grpc.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -415,7 +411,7 @@ func stopServers(ctx context.Context, ss *servers) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// took too long, manually close open transports
|
// took too long, manually close open transports
|
||||||
// e.g. watch streams
|
// e.g. watch streams
|
||||||
shutdownNow()
|
ss.grpc.Stop()
|
||||||
|
|
||||||
// concurrent GracefulStop should be interrupted
|
// concurrent GracefulStop should be interrupted
|
||||||
<-ch
|
<-ch
|
||||||
|
|||||||
Reference in New Issue
Block a user