Merge pull request #1010 from jonboulle/1010_separate_ports

separate ports used for client/server communication
This commit is contained in:
Jonathan Boulle 2014-09-16 14:36:24 -07:00
commit 47b9b5520f
5 changed files with 50 additions and 21 deletions

View File

@ -1,5 +1,5 @@
# Use goreman to run `go get github.com/mattn/goreman`
etcd1: ./etcd -id 0x1 -l :8080 -peers '0x1=localhost:8080&0x2=localhost:8081&0x3=localhost:8082'
etcd2: ./etcd -id 0x2 -l :8081 -peers '0x1=localhost:8080&0x2=localhost:8081&0x3=localhost:8082'
etcd3: ./etcd -id 0x3 -l :8082 -peers '0x1=localhost:8080&0x2=localhost:8081&0x3=localhost:8082'
proxy: ./etcd -proxy-mode -l :4001 -peers '0x1=localhost:8080&0x2=localhost:8081&0x3=localhost:8082'
etcd1: ./etcd -id 0x1 -l 127.0.0.1:4001 -r :7001 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
etcd2: ./etcd -id 0x2 -l 127.0.0.1:4002 -r :7002 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
etcd3: ./etcd -id 0x3 -l 127.0.0.1:4003 -r :7003 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
proxy: ./etcd -proxy-mode -l 127.0.0.1:8080 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'

View File

@ -35,18 +35,17 @@ const (
var errClosed = errors.New("etcdhttp: client closed connection")
// NewHandler generates a muxed http.Handler with the given parameters.
func NewHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
sh := &serverHandler{
timeout: timeout,
server: server,
peers: peers,
timeout: timeout,
}
if sh.timeout == 0 {
sh.timeout = DefaultTimeout
}
mux := http.NewServeMux()
mux.HandleFunc(raftPrefix, sh.serveRaft)
mux.HandleFunc(keysPrefix, sh.serveKeys)
mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
// TODO: dynamic configuration may make this outdated. take care of it.
@ -56,6 +55,17 @@ func NewHandler(server etcdserver.Server, peers Peers, timeout time.Duration) ht
return mux
}
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(server etcdserver.Server) http.Handler {
sh := &serverHandler{
server: server,
}
mux := http.NewServeMux()
mux.HandleFunc(raftPrefix, sh.serveRaft)
mux.HandleFunc("/", http.NotFound)
return mux
}
// serverHandler provides http.Handlers for etcd client and raft communication.
type serverHandler struct {
timeout time.Duration

View File

@ -596,7 +596,7 @@ func TestV2MachinesEndpoint(t *testing.T) {
{"POST", http.StatusMethodNotAllowed},
}
m := NewHandler(nil, Peers{}, time.Hour)
m := NewClientHandler(nil, Peers{}, time.Hour)
s := httptest.NewServer(m)
defer s.Close()

View File

@ -36,7 +36,7 @@ func TestSet(t *testing.T) {
srv.Start()
defer srv.Stop()
h := etcdhttp.NewHandler(srv, nil, time.Hour)
h := etcdhttp.NewClientHandler(srv, nil, time.Hour)
s := httptest.NewServer(h)
defer s.Close()

41
main.go
View File

@ -27,6 +27,7 @@ var (
fid = flag.String("id", "0x1", "ID of this server")
timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout")
laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')")
paddr = flag.String("r", ":2380", "Peer service address (e.g., ':2380')")
dir = flag.String("data-dir", "", "Path to the data directory")
proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.")
@ -41,18 +42,16 @@ func init() {
func main() {
flag.Parse()
var h http.Handler
if *proxyMode {
h = startProxy()
startProxy()
} else {
h = startEtcd()
startEtcd()
}
http.Handle("/", h)
log.Fatal(http.ListenAndServe(*laddr, nil))
}
func startEtcd() http.Handler {
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
// Never returns.
func startEtcd() {
id, err := strconv.ParseInt(*fid, 0, 64)
if err != nil {
log.Fatal(err)
@ -83,7 +82,25 @@ func startEtcd() http.Handler {
Ticker: time.Tick(100 * time.Millisecond),
}
s.Start()
return etcdhttp.NewHandler(s, *peers, *timeout)
ch := etcdhttp.NewClientHandler(s, *peers, *timeout)
ph := etcdhttp.NewPeerHandler(s)
// Start the peer server in a goroutine
go func() {
ps := &http.Server{
Addr: *paddr,
Handler: ph,
}
log.Fatal(ps.ListenAndServe())
}()
// Client server takes over the main goroutine
cs := &http.Server{
Addr: *laddr,
Handler: ch,
}
log.Fatal(cs.ListenAndServe())
}
// startRaft starts a raft node from the given wal dir.
@ -118,11 +135,13 @@ func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
return n, w
}
func startProxy() http.Handler {
// startEtcd launches an HTTP proxy for client communication which proxies to other etcd nodes.
// Never returns.
func startProxy() {
h, err := proxy.NewHandler((*peers).Endpoints())
if err != nil {
log.Fatal(err)
}
return h
http.Handle("/", h)
log.Fatal(http.ListenAndServe(*laddr, h))
}