Merge pull request #6212 from xiang90/readindex

etcdserver: initial read index implementation
This commit is contained in:
Xiang Li 2016-09-27 11:51:08 -05:00 committed by GitHub
commit 150576fa72
4 changed files with 169 additions and 1 deletions

View File

@ -103,6 +103,9 @@ type raftNode struct {
// a chan to send out apply
applyc chan apply
// a chan to send out readState
readStateC chan raft.ReadState
// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
@ -196,6 +199,14 @@ func (r *raftNode) start(s *EtcdServer) {
}
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-r.stopped:
return
}
}
raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,

View File

@ -177,12 +177,22 @@ type EtcdServer struct {
snapCount uint64
w wait.Wait
readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
// readwaitC
readwaitc chan struct{}
// readNotifier is used to notify the read routine that it can process the request
// when there is no error
readNotifier *notifier
// stop signals the run goroutine should shutdown.
stop chan struct{}
// stopping is closed by run goroutine on shutdown.
stopping chan struct{}
// done is closed when all goroutines from start() complete.
done chan struct{}
errorc chan error
id types.ID
attributes membership.Attributes
@ -391,6 +401,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
@ -478,6 +489,7 @@ func (s *EtcdServer) Start() {
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
}
// start prepares and starts server in a new goroutine. It is no longer safe to
@ -493,6 +505,8 @@ func (s *EtcdServer) start() {
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stopping = make(chan struct{})
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
if s.ClusterVersion() != nil {
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
} else {

View File

@ -79,3 +79,19 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool
}
return longest, true
}
type notifier struct {
c chan struct{}
err error
}
func newNotifier() *notifier {
return &notifier{
c: make(chan struct{}, 0),
}
}
func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}

View File

@ -15,6 +15,8 @@
package etcdserver
import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
@ -26,6 +28,9 @@ import (
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"
"github.com/coreos/go-semver/semver"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
@ -44,6 +49,10 @@ const (
maxGapBetweenApplyAndCommitIndex = 1000
)
var (
newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0"))
)
type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
@ -86,6 +95,31 @@ type Authenticator interface {
}
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
// TODO: remove this checking when we release etcd 3.2
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
return s.legacyRange(ctx, r)
}
if !r.Serializable {
err := s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
}
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return resp, err
}
// TODO: remove this func when we release etcd 3.2
func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
var resp *pb.RangeResponse
var err error
@ -143,6 +177,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}
return resp, err
}
// TODO: readonly Txn do not need to go through raft
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
@ -641,3 +676,95 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
internalTimeout := time.Second
for {
ctx := make([]byte, 8)
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())
select {
case <-s.readwaitc:
case <-s.stopping:
return
}
nextnr := newNotifier()
s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()
cctx, cancel := context.WithTimeout(context.Background(), internalTimeout)
if err := s.r.ReadIndex(cctx, ctx); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
nr.notify(err)
continue
}
cancel()
var (
timeout bool
done bool
)
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctx)
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
}
case <-time.After(internalTimeout):
plog.Warningf("timed out waiting for read index response")
nr.notify(ErrTimeout)
timeout = true
case <-s.stopping:
return
}
}
if !done {
continue
}
if ai := s.getAppliedIndex(); ai < rs.Index {
select {
case <-s.applyWait.Wait(rs.Index):
case <-s.stopping:
return
}
}
// unblock all l-reads requested at indices before rs.Index
nr.notify(nil)
}
}
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}
// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}