etcdserver: more methods added

This commit is contained in:
Blake Mizerany 2014-08-26 16:39:26 -07:00 committed by Yicheng Qin
parent 225e618b8f
commit f4d8c3fc66
3 changed files with 27 additions and 20 deletions

View File

@ -34,7 +34,7 @@ type Request struct {
Val string `protobuf:"bytes,4,req,name=val" json:"val"`
Dir bool `protobuf:"varint,5,req,name=dir" json:"dir"`
PrevValue string `protobuf:"bytes,6,req,name=prevValue" json:"prevValue"`
PrevIndex int64 `protobuf:"varint,7,req,name=prevIndex" json:"prevIndex"`
PrevIndex uint64 `protobuf:"varint,7,req,name=prevIndex" json:"prevIndex"`
PrevExists bool `protobuf:"varint,8,req,name=prevExists" json:"prevExists"`
Expiration int64 `protobuf:"varint,9,req,name=expiration" json:"expiration"`
Wait bool `protobuf:"varint,10,req,name=wait" json:"wait"`
@ -199,7 +199,7 @@ func (m *Request) Unmarshal(data []byte) error {
}
b := data[index]
index++
m.PrevIndex |= (int64(b) & 0x7F) << shift
m.PrevIndex |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}

View File

@ -14,7 +14,7 @@ message Request {
required string val = 4 [(gogoproto.nullable) = false];
required bool dir = 5 [(gogoproto.nullable) = false];
required string prevValue = 6 [(gogoproto.nullable) = false];
required int64 prevIndex = 7 [(gogoproto.nullable) = false];
required uint64 prevIndex = 7 [(gogoproto.nullable) = false];
required bool prevExists = 8 [(gogoproto.nullable) = false];
required int64 expiration = 9 [(gogoproto.nullable) = false];
required bool wait = 10 [(gogoproto.nullable) = false];

View File

@ -14,10 +14,10 @@ var ErrUnknownMethod = errors.New("etcdserver: unknown method")
type Response struct {
// The last seen term raft was at when this request was built.
Term int
Term int64
// The last seen index raft was at when this request was built.
Index int
Commit int64
*store.Event
*store.Watcher
@ -53,7 +53,11 @@ func (s *Server) Run(ctx context.Context) {
s.Send(rd.Messages)
go func() {
for _, e := range rd.CommittedEntries {
s.apply(rd, e)
var resp Response
resp.Event, resp.err = s.apply(e)
resp.Term = rd.Term
resp.Commit = rd.Commit
s.w.Trigger(e.Id, resp)
}
}()
case <-ctx.Done():
@ -74,7 +78,7 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
return Response{}, err
}
ch := s.w.Register(r.Id)
s.n.Propose(ctx, data)
s.n.Propose(ctx, r.Id, data)
select {
case x := <-ch:
resp := x.(Response)
@ -103,27 +107,30 @@ func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
}
}
func respond(rd Ready, ev *store.Event, err error) Response {
return Response{Term: rd.Term, Index: rd.Index, Event: ev, err: err}
}
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
func (s *Server) apply(rd Ready, e raft.Entry) Response {
resp := Response{Term: rd.Term, Index: rd.Index}
func (s *Server) apply(e raft.Entry) (*store.Event, error) {
var r Request
if resp.err = r.Unmarshal(e.Data); resp.err != nil {
return resp
if err := r.Unmarshal(e.Data); err != nil {
return nil, err
}
expr := time.Unix(0, r.Expiration)
switch r.Method {
case "POST":
resp.Event, resp.err = st.Create(r.Path, r.Dir, r.Val, true, time.Unix(0, r.Expiration))
return resp
return s.st.Create(r.Path, r.Dir, r.Val, true, expr)
case "PUT":
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return s.st.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)
case r.PrevExists:
// TODO(bmizerany): implement PrevExists
panic("not implemented")
default:
return s.st.Update(r.Path, r.Val, expr)
}
case "DELETE":
panic("not implemented")
default:
return Response{err: ErrUnknownMethod}
return nil, ErrUnknownMethod
}
}