etcdserver: add s.apply

This commit is contained in:
Xiang Li 2014-10-11 21:58:39 +08:00
parent 30c7a7f2dd
commit b53b74733a

View File

@ -223,27 +223,8 @@ func (s *EtcdServer) run() {
// care to apply entries in a single goroutine, and not
// race them.
// TODO: apply configuration change into ClusterStore.
for _, e := range rd.CommittedEntries {
switch e.Type {
case raftpb.EntryNormal:
var r pb.Request
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.w.Trigger(r.ID, s.applyRequest(r))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.applyConfChange(cc)
s.w.Trigger(cc.ID, nil)
default:
panic("unexpected entry type")
}
atomic.StoreUint64(&s.raftIndex, e.Index)
atomic.StoreUint64(&s.raftTerm, e.Term)
appliedi = e.Index
if len(rd.CommittedEntries) != 0 {
appliedi = s.apply(rd.CommittedEntries)
}
if rd.SoftState != nil {
@ -459,6 +440,34 @@ func getExpirationTime(r *pb.Request) time.Time {
return t
}
func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
var applied uint64
for i := range es {
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
var r pb.Request
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.w.Trigger(r.ID, s.applyRequest(r))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.applyConfChange(cc)
s.w.Trigger(cc.ID, nil)
default:
panic("unexpected entry type")
}
atomic.StoreUint64(&s.raftIndex, e.Index)
atomic.StoreUint64(&s.raftTerm, e.Term)
applied = e.Index
}
return applied
}
// applyRequest interprets r as a call to store.X and returns a Response interpreted
// from store.Event
func (s *EtcdServer) applyRequest(r pb.Request) Response {