Merge pull request #3348 from xiang90/l

use limited listener from golang
This commit is contained in:
Xiang Li 2015-08-20 22:44:51 -07:00
commit e8e507b29b
6 changed files with 157 additions and 135 deletions

4
Godeps/Godeps.json generated
View File

@ -123,6 +123,10 @@
"ImportPath": "golang.org/x/net/context",
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
},
{
"ImportPath": "golang.org/x/net/netutil",
"Rev": "7dbad50ab5b31073856416cdcfeb2796d682f844"
},
{
"ImportPath": "golang.org/x/oauth2",
"Rev": "3046bc76d6dfd7d3707f6640f85e42d9c4050f50"

View File

@ -0,0 +1,48 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package netutil provides network utility functions, complementing the more
// common ones in the net package.
package netutil
import (
"net"
"sync"
)
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener.
func LimitListener(l net.Listener, n int) net.Listener {
return &limitListener{l, make(chan struct{}, n)}
}
type limitListener struct {
net.Listener
sem chan struct{}
}
func (l *limitListener) acquire() { l.sem <- struct{}{} }
func (l *limitListener) release() { <-l.sem }
func (l *limitListener) Accept() (net.Conn, error) {
l.acquire()
c, err := l.Listener.Accept()
if err != nil {
l.release()
return nil, err
}
return &limitListenerConn{Conn: c, release: l.release}, nil
}
type limitListenerConn struct {
net.Conn
releaseOnce sync.Once
release func()
}
func (l *limitListenerConn) Close() error {
err := l.Conn.Close()
l.releaseOnce.Do(l.release)
return err
}

View File

@ -0,0 +1,103 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.3
// (We only run this test on Go 1.3 because the HTTP client timeout behavior
// was bad in previous releases, causing occasional deadlocks.)
package netutil
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestLimitListener(t *testing.T) {
const (
max = 5
num = 200
)
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen: %v", err)
}
defer l.Close()
l = LimitListener(l, max)
var open int32
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if n := atomic.AddInt32(&open, 1); n > max {
t.Errorf("%d open connections, want <= %d", n, max)
}
defer atomic.AddInt32(&open, -1)
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
}))
var wg sync.WaitGroup
var failed int32
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c := http.Client{Timeout: 3 * time.Second}
r, err := c.Get("http://" + l.Addr().String())
if err != nil {
t.Logf("Get: %v", err)
atomic.AddInt32(&failed, 1)
return
}
defer r.Body.Close()
io.Copy(ioutil.Discard, r.Body)
}()
}
wg.Wait()
// We expect some Gets to fail as the kernel's accept queue is filled,
// but most should succeed.
if failed >= num/2 {
t.Errorf("too many Gets failed: %v", failed)
}
}
type errorListener struct {
net.Listener
}
func (errorListener) Accept() (net.Conn, error) {
return nil, errFake
}
var errFake = errors.New("fake error from errorListener")
// This used to hang.
func TestLimitListenerError(t *testing.T) {
donec := make(chan bool, 1)
go func() {
const n = 2
ll := LimitListener(errorListener{}, n)
for i := 0; i < n+1; i++ {
_, err := ll.Accept()
if err != errFake {
t.Fatalf("Accept error = %v; want errFake", err)
}
}
donec <- true
}()
select {
case <-donec:
case <-time.After(5 * time.Second):
t.Fatal("timeout. deadlock?")
}
}

View File

@ -31,6 +31,7 @@ import (
systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/netutil"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver"
@ -227,7 +228,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
if fdLimit <= reservedInternalFDNum {
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
}
l = &transport.LimitedConnListener{Listener: l, RuntimeFDLimit: fdLimit - reservedInternalFDNum}
l = netutil.LimitListener(l, int(fdLimit-reservedInternalFDNum))
}
urlStr := u.String()

View File

@ -1,55 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
import (
"errors"
"net"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/pkg/runtime"
)
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd/pkg", "transport")
type LimitedConnListener struct {
net.Listener
RuntimeFDLimit uint64
}
func (l *LimitedConnListener) Accept() (net.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
n, err := runtime.FDUsage()
// Check whether fd number in use exceeds the set limit.
if err == nil && n >= l.RuntimeFDLimit {
conn.Close()
plog.Errorf("accept error: closing connection, exceed file descriptor usage limitation (fd limit=%d)", l.RuntimeFDLimit)
return nil, &acceptError{error: errors.New("exceed file descriptor usage limitation"), temporary: true}
}
return conn, nil
}
type acceptError struct {
error
temporary bool
}
func (e *acceptError) Timeout() bool { return false }
func (e *acceptError) Temporary() bool { return e.temporary }

View File

@ -1,79 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
import (
"net"
"net/http"
"net/http/httptest"
"testing"
"github.com/coreos/etcd/pkg/runtime"
)
func TestLimitedConnListenerAccept(t *testing.T) {
if _, err := runtime.FDUsage(); err != nil {
t.Skip("skip test due to unsupported runtime.FDUsage")
}
ln, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
fdNum, err := runtime.FDUsage()
if err != nil {
t.Fatal(err)
}
srv := &httptest.Server{
Listener: &LimitedConnListener{
Listener: ln,
RuntimeFDLimit: fdNum + 100,
},
Config: &http.Server{},
}
srv.Start()
defer srv.Close()
resp, err := http.Get(srv.URL)
defer resp.Body.Close()
if err != nil {
t.Fatalf("Get error = %v, want nil", err)
}
}
func TestLimitedConnListenerLimit(t *testing.T) {
if _, err := runtime.FDUsage(); err != nil {
t.Skip("skip test due to unsupported runtime.FDUsage")
}
ln, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
srv := &httptest.Server{
Listener: &LimitedConnListener{
Listener: ln,
RuntimeFDLimit: 0,
},
Config: &http.Server{},
}
srv.Start()
defer srv.Close()
_, err = http.Get(srv.URL)
if err == nil {
t.Fatalf("unexpected nil Get error")
}
}