*: detect leaky goroutines, fix leaks

gexpect.Interact leaks. This adds ReadLine method to wait for the leaky
goroutine to accept an EOF.

Fixes https://github.com/coreos/etcd/issues/4258.

Reference: https://github.com/coreos/etcd/pull/4261#issuecomment-174198945.
This commit is contained in:
Gyu-Ho Lee 2016-01-22 16:00:53 -08:00
parent 5a9f81b198
commit 96d2ee20e3
8 changed files with 223 additions and 123 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/testutil"
)
const (
@ -34,6 +35,7 @@ const (
)
func TestBasicOpsNoTLS(t *testing.T) {
defer testutil.AfterTest(t)
testProcessClusterPutGet(
t,
&etcdProcessClusterConfig{
@ -46,6 +48,7 @@ func TestBasicOpsNoTLS(t *testing.T) {
}
func TestBasicOpsAllTLS(t *testing.T) {
defer testutil.AfterTest(t)
testProcessClusterPutGet(
t,
&etcdProcessClusterConfig{
@ -58,6 +61,7 @@ func TestBasicOpsAllTLS(t *testing.T) {
}
func TestBasicOpsPeerTLS(t *testing.T) {
defer testutil.AfterTest(t)
testProcessClusterPutGet(
t,
&etcdProcessClusterConfig{
@ -70,6 +74,7 @@ func TestBasicOpsPeerTLS(t *testing.T) {
}
func TestBasicOpsClientTLS(t *testing.T) {
defer testutil.AfterTest(t)
testProcessClusterPutGet(
t,
&etcdProcessClusterConfig{
@ -175,8 +180,9 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
go func(etcdp *etcdProcess) {
_, err := etcdp.proc.ExpectRegex(readyStr)
readyC <- err
etcdp.proc.ReadUntil('\n') // don't display rest of line
etcdp.proc.Interact()
etcdp.proc.ReadLine()
etcdp.proc.Interact() // this blocks(leaks) if another goroutine is reading
etcdp.proc.ReadLine() // wait for leaky goroutine to accept an EOF
close(etcdp.donec)
}(epc.procs[i])
}

20
e2e/main_test.go Normal file
View File

@ -0,0 +1,20 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package e2e
import (
"os"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}

View File

@ -73,7 +73,7 @@ func TestClusterOf1(t *testing.T) { testCluster(t, 1) }
func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
func testCluster(t *testing.T, size int) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, size)
c.Launch(t)
defer c.Terminate(t)
@ -81,7 +81,7 @@ func testCluster(t *testing.T, size int) {
}
func TestTLSClusterOf3(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
c.Launch(t)
defer c.Terminate(t)
@ -92,7 +92,7 @@ func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1
func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
func testClusterUsingDiscovery(t *testing.T, size int) {
defer afterTest(t)
defer testutil.AfterTest(t)
dc := NewCluster(t, 1)
dc.Launch(t)
defer dc.Terminate(t)
@ -115,7 +115,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
}
func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
dc := NewCluster(t, 1)
dc.Launch(t)
defer dc.Terminate(t)
@ -143,7 +143,7 @@ func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
func testDoubleClusterSize(t *testing.T, size int) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, size)
c.Launch(t)
defer c.Terminate(t)
@ -155,7 +155,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
}
func TestDoubleTLSClusterSizeOf3(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
c.Launch(t)
defer c.Terminate(t)
@ -170,7 +170,7 @@ func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
func testDecreaseClusterSize(t *testing.T, size int) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, size)
c.Launch(t)
defer c.Terminate(t)
@ -226,7 +226,7 @@ func TestForceNewCluster(t *testing.T) {
}
func TestAddMemberAfterClusterFullRotation(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, 3)
c.Launch(t)
defer c.Terminate(t)
@ -248,7 +248,7 @@ func TestAddMemberAfterClusterFullRotation(t *testing.T) {
// Ensure we can remove a member then add a new one back immediately.
func TestIssue2681(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, 5)
c.Launch(t)
defer c.Terminate(t)
@ -263,7 +263,7 @@ func TestIssue2681(t *testing.T) {
// Ensure we can remove a member after a snapshot then add a new one back.
func TestIssue2746(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, 5)
for _, m := range c.Members {
@ -288,7 +288,7 @@ func TestIssue2746(t *testing.T) {
// Ensure etcd will not panic when removing a just started member.
func TestIssue2904(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
// start 1-member cluster to ensure member 0 is the leader of the cluster.
c := NewCluster(t, 1)
c.Launch(t)

View File

@ -5,119 +5,16 @@
package integration
import (
"fmt"
"net/http"
"os"
"runtime"
"sort"
"strings"
"testing"
"time"
"github.com/coreos/etcd/pkg/testutil"
)
var atLeastGo15 bool = false
func init() {
var major, minor int
var discard string
i, err := fmt.Sscanf(runtime.Version(), "go%d.%d%s", &major, &minor, &discard)
atLeastGo15 = (err == nil && i == 3 && (major > 1 || major == 1 && minor >= 5))
}
func interestingGoroutines() (gs []string) {
buf := make([]byte, 2<<20)
buf = buf[:runtime.Stack(buf, true)]
for _, g := range strings.Split(string(buf), "\n\n") {
sl := strings.SplitN(g, "\n", 2)
if len(sl) != 2 {
continue
}
stack := strings.TrimSpace(sl[1])
if stack == "" ||
strings.Contains(stack, "created by testing.RunTests") ||
strings.Contains(stack, "testing.Main(") ||
strings.Contains(stack, "runtime.goexit") ||
strings.Contains(stack, "github.com/coreos/etcd/integration.interestingGoroutines") ||
strings.Contains(stack, "github.com/coreos/etcd/pkg/logutil.(*MergeLogger).outputLoop") ||
strings.Contains(stack, "github.com/golang/glog.(*loggingT).flushDaemon") ||
strings.Contains(stack, "created by runtime.gc") ||
strings.Contains(stack, "runtime.MHeap_Scavenger") {
continue
}
gs = append(gs, stack)
}
sort.Strings(gs)
return
}
// Verify the other tests didn't leave any goroutines running.
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && goroutineLeaked() {
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}
func goroutineLeaked() bool {
if testing.Short() {
// not counting goroutines for leakage in -short mode
return false
}
gs := interestingGoroutines()
n := 0
stackCount := make(map[string]int)
for _, g := range gs {
stackCount[g]++
n++
}
if n == 0 {
return false
}
fmt.Fprintf(os.Stderr, "Too many goroutines running after integration test(s).\n")
for stack, count := range stackCount {
fmt.Fprintf(os.Stderr, "%d instances of:\n%s\n", count, stack)
}
return true
}
func afterTest(t *testing.T) {
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
if testing.Short() {
return
}
var bad string
badSubstring := map[string]string{
").writeLoop(": "a Transport",
"created by net/http/httptest.(*Server).Start": "an httptest.Server",
"timeoutHandler": "a TimeoutHandler",
"net.(*netFD).connect(": "a timing out dial",
").noteClientGone(": "a closenotifier sender",
}
// readLoop was buggy before go1.5:
// https://github.com/golang/go/issues/10457
if atLeastGo15 {
badSubstring[").readLoop("] = "a Transport"
}
var stacks string
for i := 0; i < 6; i++ {
bad = ""
stacks = strings.Join(interestingGoroutines(), "\n\n")
for substr, what := range badSubstring {
if strings.Contains(stacks, substr) {
bad = what
}
}
if bad == "" {
return
}
// Bad stuff found, but goroutines might just still be
// shutting down, so give it some time.
time.Sleep(50 * time.Millisecond)
}
t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks)
}

View File

@ -23,10 +23,11 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/testutil"
)
func TestPauseMember(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, 5)
c.Launch(t)
defer c.Terminate(t)
@ -44,7 +45,7 @@ func TestPauseMember(t *testing.T) {
}
func TestRestartMember(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
c := NewCluster(t, 3)
c.Launch(t)
defer c.Terminate(t)
@ -81,7 +82,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
}
func TestSnapshotAndRestartMember(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
m := mustNewMember(t, "snapAndRestartTest", false)
m.SnapCount = 100
m.Launch()

View File

@ -17,10 +17,12 @@ package integration
import (
"os/exec"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestUpgradeMember(t *testing.T) {
defer afterTest(t)
defer testutil.AfterTest(t)
m := mustNewMember(t, "integration046", false)
cmd := exec.Command("cp", "-r", "testdata/integration046_data/conf", "testdata/integration046_data/log", "testdata/integration046_data/snapshot", m.DataDir)
err := cmd.Run()

134
pkg/testutil/leak.go Normal file
View File

@ -0,0 +1,134 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testutil
import (
"fmt"
"net/http"
"os"
"runtime"
"sort"
"strings"
"testing"
"time"
)
/*
CheckLeakedGoroutine verifies tests do not leave any leaky
goroutines. It returns true when there are goroutines still
running(leaking) after all tests.
import "github.com/coreos/etcd/pkg/testutil"
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}
func TestSample(t *testing.T) {
defer testutil.AfterTest(t)
...
}
*/
func CheckLeakedGoroutine() bool {
if testing.Short() {
// not counting goroutines for leakage in -short mode
return false
}
gs := interestingGoroutines()
n := 0
stackCount := make(map[string]int)
for _, g := range gs {
stackCount[g]++
n++
}
if n == 0 {
return false
}
fmt.Fprintf(os.Stderr, "Too many goroutines running after all test(s).\n")
for stack, count := range stackCount {
fmt.Fprintf(os.Stderr, "%d instances of:\n%s\n", count, stack)
}
return true
}
func AfterTest(t *testing.T) {
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
if testing.Short() {
return
}
var bad string
badSubstring := map[string]string{
").writeLoop(": "a Transport",
"created by net/http/httptest.(*Server).Start": "an httptest.Server",
"timeoutHandler": "a TimeoutHandler",
"net.(*netFD).connect(": "a timing out dial",
").noteClientGone(": "a closenotifier sender",
}
// readLoop was buggy before go1.5:
// https://github.com/golang/go/issues/10457
if getAtLeastGo15() {
badSubstring[").readLoop("] = "a Transport"
}
var stacks string
for i := 0; i < 6; i++ {
bad = ""
stacks = strings.Join(interestingGoroutines(), "\n\n")
for substr, what := range badSubstring {
if strings.Contains(stacks, substr) {
bad = what
}
}
if bad == "" {
return
}
// Bad stuff found, but goroutines might just still be
// shutting down, so give it some time.
time.Sleep(50 * time.Millisecond)
}
t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks)
}
func interestingGoroutines() (gs []string) {
buf := make([]byte, 2<<20)
buf = buf[:runtime.Stack(buf, true)]
for _, g := range strings.Split(string(buf), "\n\n") {
sl := strings.SplitN(g, "\n", 2)
if len(sl) != 2 {
continue
}
stack := strings.TrimSpace(sl[1])
if stack == "" ||
strings.Contains(stack, "created by testing.RunTests") ||
strings.Contains(stack, "testing.Main(") ||
strings.Contains(stack, "runtime.goexit") ||
strings.Contains(stack, "github.com/coreos/etcd/pkg/testutil.interestingGoroutines") ||
strings.Contains(stack, "github.com/coreos/etcd/pkg/logutil.(*MergeLogger).outputLoop") ||
strings.Contains(stack, "github.com/golang/glog.(*loggingT).flushDaemon") ||
strings.Contains(stack, "created by runtime.gc") ||
strings.Contains(stack, "runtime.MHeap_Scavenger") {
continue
}
gs = append(gs, stack)
}
sort.Strings(gs)
return
}
// getAtLeastGo15 returns true if the runtime has go1.5+.
func getAtLeastGo15() bool {
var major, minor int
var discard string
i, err := fmt.Sscanf(runtime.Version(), "go%d.%d%s", &major, &minor, &discard)
return (err == nil && i == 3 && (major > 1 || major == 1 && minor >= 5))
}

40
pkg/testutil/leak_test.go Normal file
View File

@ -0,0 +1,40 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testutil
import (
"fmt"
"os"
"testing"
)
func TestMain(m *testing.M) {
m.Run()
isLeaked := CheckLeakedGoroutine()
if !isLeaked {
fmt.Fprintln(os.Stderr, "expected leaky goroutines but none is detected")
os.Exit(1)
}
os.Exit(0)
}
func TestSample(t *testing.T) {
defer AfterTest(t)
for range make([]struct{}, 100) {
go func() {
select {}
}()
}
}