mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Streaming buffer is used for: 1. hand over data to io goroutine in non-blocking way 2. hold pressure for temprorary network delay 3. be able to wait on I/O instead of data coming under high throughput The old 1024 value is too small and is very likely to be full and break the streaming when suffering temprorary network delay.
216 lines
5.0 KiB
Go
216 lines
5.0 KiB
Go
/*
|
|
Copyright 2014 CoreOS, Inc.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package rafthttp
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
)
|
|
|
|
const (
|
|
streamBufSize = 4096
|
|
)
|
|
|
|
type WriteFlusher interface {
|
|
io.Writer
|
|
http.Flusher
|
|
}
|
|
|
|
type streamServer struct {
|
|
to types.ID
|
|
term uint64
|
|
fs *stats.FollowerStats
|
|
q chan []raftpb.Entry
|
|
done chan struct{}
|
|
}
|
|
|
|
func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer {
|
|
s := &streamServer{
|
|
to: to,
|
|
term: term,
|
|
fs: fs,
|
|
q: make(chan []raftpb.Entry, streamBufSize),
|
|
done: make(chan struct{}),
|
|
}
|
|
go s.handle(w)
|
|
log.Printf("rafthttp: stream server to %s at term %d starts", to, term)
|
|
return s
|
|
}
|
|
|
|
func (s *streamServer) send(ents []raftpb.Entry) error {
|
|
select {
|
|
case <-s.done:
|
|
return fmt.Errorf("stopped")
|
|
default:
|
|
}
|
|
select {
|
|
case s.q <- ents:
|
|
return nil
|
|
default:
|
|
log.Printf("rafthttp: streamer reaches maximal serving to %s", s.to)
|
|
return fmt.Errorf("reach maximal serving")
|
|
}
|
|
}
|
|
|
|
func (s *streamServer) stop() {
|
|
close(s.q)
|
|
<-s.done
|
|
}
|
|
|
|
func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
|
|
|
|
func (s *streamServer) handle(w WriteFlusher) {
|
|
defer func() {
|
|
close(s.done)
|
|
log.Printf("rafthttp: stream server to %s at term %d is closed", s.to, s.term)
|
|
}()
|
|
|
|
ew := &entryWriter{w: w}
|
|
for ents := range s.q {
|
|
start := time.Now()
|
|
if err := ew.writeEntries(ents); err != nil {
|
|
log.Printf("rafthttp: write ents error: %v", err)
|
|
return
|
|
}
|
|
w.Flush()
|
|
s.fs.Succ(time.Since(start))
|
|
}
|
|
}
|
|
|
|
type streamClient struct {
|
|
id types.ID
|
|
to types.ID
|
|
term uint64
|
|
p Processor
|
|
|
|
closer io.Closer
|
|
done chan struct{}
|
|
}
|
|
|
|
func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
|
|
return &streamClient{
|
|
id: id,
|
|
to: to,
|
|
term: term,
|
|
p: p,
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Dial dials to the remote url, and sends streaming request. If it succeeds,
|
|
// it returns nil error, and the caller should call Handle function to keep
|
|
// receiving appendEntry messages.
|
|
func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error {
|
|
uu, err := url.Parse(u)
|
|
if err != nil {
|
|
return fmt.Errorf("parse url %s error: %v", u, err)
|
|
}
|
|
uu.Path = path.Join(RaftStreamPrefix, s.id.String())
|
|
req, err := http.NewRequest("GET", uu.String(), nil)
|
|
if err != nil {
|
|
return fmt.Errorf("new request to %s error: %v", u, err)
|
|
}
|
|
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
|
|
req.Header.Set("X-Raft-To", s.to.String())
|
|
req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
|
|
resp, err := tr.RoundTrip(req)
|
|
if err != nil {
|
|
return fmt.Errorf("error posting to %q: %v", u, err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
resp.Body.Close()
|
|
return fmt.Errorf("unhandled http status %d", resp.StatusCode)
|
|
}
|
|
s.closer = resp.Body
|
|
go s.handle(resp.Body)
|
|
log.Printf("rafthttp: stream client to %s at term %d starts", s.to, s.term)
|
|
return nil
|
|
}
|
|
|
|
func (s *streamClient) stop() {
|
|
s.closer.Close()
|
|
<-s.done
|
|
}
|
|
|
|
func (s *streamClient) isStopped() bool {
|
|
select {
|
|
case <-s.done:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (s *streamClient) handle(r io.Reader) {
|
|
defer func() {
|
|
close(s.done)
|
|
log.Printf("rafthttp: stream client to %s at term %d is closed", s.to, s.term)
|
|
}()
|
|
|
|
er := &entryReader{r: r}
|
|
for {
|
|
ents, err := er.readEntries()
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
log.Printf("rafthttp: read ents error: %v", err)
|
|
}
|
|
return
|
|
}
|
|
// Considering Commit in MsgApp is not recovered, zero-entry appendEntry
|
|
// messages have no use to raft state machine. Drop it here because
|
|
// we don't have easy way to recover its Index easily.
|
|
if len(ents) == 0 {
|
|
continue
|
|
}
|
|
// The commit index field in appendEntry message is not recovered.
|
|
// The follower updates its commit index through heartbeat.
|
|
msg := raftpb.Message{
|
|
Type: raftpb.MsgApp,
|
|
From: uint64(s.to),
|
|
To: uint64(s.id),
|
|
Term: s.term,
|
|
LogTerm: s.term,
|
|
Index: ents[0].Index - 1,
|
|
Entries: ents,
|
|
}
|
|
if err := s.p.Process(context.TODO(), msg); err != nil {
|
|
log.Printf("rafthttp: process raft message error: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func shouldInitStream(m raftpb.Message) bool {
|
|
return m.Type == raftpb.MsgAppResp && m.Reject == false
|
|
}
|
|
|
|
func canUseStream(m raftpb.Message) bool {
|
|
return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
|
|
}
|