Merge pull request #4122 from gyuho/watchid_events

*: WatchResponse for multiple Events with WatchID
This commit is contained in:
Gyu-Ho Lee 2016-01-03 16:40:28 -08:00
commit bf9e2a550c
8 changed files with 52 additions and 65 deletions

View File

@ -111,7 +111,7 @@ func (sws *serverWatchStream) recvLoop() error {
func (sws *serverWatchStream) sendLoop() { func (sws *serverWatchStream) sendLoop() {
for { for {
select { select {
case evs, ok := <-sws.watchStream.Chan(): case wresp, ok := <-sws.watchStream.Chan():
if !ok { if !ok {
return return
} }
@ -119,12 +119,13 @@ func (sws *serverWatchStream) sendLoop() {
// TODO: evs is []storagepb.Event type // TODO: evs is []storagepb.Event type
// either return []*storagepb.Event from storage package // either return []*storagepb.Event from storage package
// or define protocol buffer with []storagepb.Event. // or define protocol buffer with []storagepb.Event.
evs := wresp.Events
events := make([]*storagepb.Event, len(evs)) events := make([]*storagepb.Event, len(evs))
for i := range evs { for i := range evs {
events[i] = &evs[i] events[i] = &evs[i]
} }
err := sws.gRPCStream.Send(&pb.WatchResponse{Events: events}) err := sws.gRPCStream.Send(&pb.WatchResponse{WatchId: wresp.WatchID, Events: events})
storage.ReportEventReceived() storage.ReportEventReceived()
if err != nil { if err != nil {
return return

View File

@ -740,7 +740,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("foo"), []byte("bar"))
select { select {
case evs := <-w.Chan(): case resp := <-w.Chan():
wev := storagepb.Event{ wev := storagepb.Event{
Type: storagepb.PUT, Type: storagepb.PUT,
Kv: &storagepb.KeyValue{ Kv: &storagepb.KeyValue{
@ -750,9 +750,11 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 1, ModRevision: 1,
Version: 1, Version: 1,
}, },
WatchID: wid,
} }
ev := evs[0] if resp.WatchID != wid {
t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
}
ev := resp.Events[0]
if !reflect.DeepEqual(ev, wev) { if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev) t.Errorf("watched event = %+v, want %+v", ev, wev)
} }
@ -762,7 +764,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar1")) s.Put([]byte("foo1"), []byte("bar1"))
select { select {
case evs := <-w.Chan(): case resp := <-w.Chan():
wev := storagepb.Event{ wev := storagepb.Event{
Type: storagepb.PUT, Type: storagepb.PUT,
Kv: &storagepb.KeyValue{ Kv: &storagepb.KeyValue{
@ -772,9 +774,11 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 2, ModRevision: 2,
Version: 1, Version: 1,
}, },
WatchID: wid,
} }
ev := evs[0] if resp.WatchID != wid {
t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
}
ev := resp.Events[0]
if !reflect.DeepEqual(ev, wev) { if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev) t.Errorf("watched event = %+v, want %+v", ev, wev)
} }
@ -789,7 +793,7 @@ func TestWatchableKVWatch(t *testing.T) {
defer cancel() defer cancel()
select { select {
case evs := <-w.Chan(): case resp := <-w.Chan():
wev := storagepb.Event{ wev := storagepb.Event{
Type: storagepb.PUT, Type: storagepb.PUT,
Kv: &storagepb.KeyValue{ Kv: &storagepb.KeyValue{
@ -799,9 +803,11 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 2, ModRevision: 2,
Version: 1, Version: 1,
}, },
WatchID: wid,
} }
ev := evs[0] if resp.WatchID != wid {
t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
}
ev := resp.Events[0]
if !reflect.DeepEqual(ev, wev) { if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev) t.Errorf("watched event = %+v, want %+v", ev, wev)
} }
@ -811,7 +817,7 @@ func TestWatchableKVWatch(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar11")) s.Put([]byte("foo1"), []byte("bar11"))
select { select {
case evs := <-w.Chan(): case resp := <-w.Chan():
wev := storagepb.Event{ wev := storagepb.Event{
Type: storagepb.PUT, Type: storagepb.PUT,
Kv: &storagepb.KeyValue{ Kv: &storagepb.KeyValue{
@ -821,9 +827,11 @@ func TestWatchableKVWatch(t *testing.T) {
ModRevision: 3, ModRevision: 3,
Version: 2, Version: 2,
}, },
WatchID: wid,
} }
ev := evs[0] if resp.WatchID != wid {
t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
}
ev := resp.Events[0]
if !reflect.DeepEqual(ev, wev) { if !reflect.DeepEqual(ev, wev) {
t.Errorf("watched event = %+v, want %+v", ev, wev) t.Errorf("watched event = %+v, want %+v", ev, wev)
} }

View File

@ -72,8 +72,6 @@ type Event struct {
// a delete/expire event contains the previous // a delete/expire event contains the previous
// key-value // key-value
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"` Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// watchID is the ID of watching this event is sent to.
WatchID int64 `protobuf:"varint,3,opt,name=watchID,proto3" json:"watchID,omitempty"`
} }
func (m *Event) Reset() { *m = Event{} } func (m *Event) Reset() { *m = Event{} }
@ -167,11 +165,6 @@ func (m *Event) MarshalTo(data []byte) (int, error) {
} }
i += n1 i += n1
} }
if m.WatchID != 0 {
data[i] = 0x18
i++
i = encodeVarintKv(data, i, uint64(m.WatchID))
}
return i, nil return i, nil
} }
@ -242,9 +235,6 @@ func (m *Event) Size() (n int) {
l = m.Kv.Size() l = m.Kv.Size()
n += 1 + l + sovKv(uint64(l)) n += 1 + l + sovKv(uint64(l))
} }
if m.WatchID != 0 {
n += 1 + sovKv(uint64(m.WatchID))
}
return n return n
} }
@ -485,22 +475,6 @@ func (m *Event) Unmarshal(data []byte) error {
return err return err
} }
iNdEx = postIndex iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field WatchID", wireType)
}
m.WatchID = 0
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.WatchID |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default: default:
var sizeOfWire int var sizeOfWire int
for { for {

View File

@ -35,6 +35,4 @@ message Event {
// a delete/expire event contains the previous // a delete/expire event contains the previous
// key-value // key-value
KeyValue kv = 2; KeyValue kv = 2;
// watchID is the ID of watching this event is sent to.
int64 watchID = 3;
} }

View File

@ -33,7 +33,7 @@ const (
) )
type watchable interface { type watchable interface {
watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc)
} }
type watchableStore struct { type watchableStore struct {
@ -181,12 +181,12 @@ func (s *watchableStore) NewWatchStream() WatchStream {
watchStreamGauge.Inc() watchStreamGauge.Inc()
return &watchStream{ return &watchStream{
watchable: s, watchable: s,
ch: make(chan []storagepb.Event, chanBufLen), ch: make(chan WatchResponse, chanBufLen),
cancels: make(map[int64]CancelFunc), cancels: make(map[int64]CancelFunc),
} }
} }
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) { func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -340,8 +340,9 @@ func (s *watchableStore) syncWatchers() {
} }
for w, es := range newWatcherToEventMap(keyToUnsynced, evs) { for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
wr := WatchResponse{WatchID: w.id, Events: es}
select { select {
case w.ch <- es: case w.ch <- wr:
pendingEventsGauge.Add(float64(len(es))) pendingEventsGauge.Add(float64(len(es)))
default: default:
// TODO: handle the full unsynced watchers. // TODO: handle the full unsynced watchers.
@ -374,8 +375,9 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
continue continue
} }
es := we[w] es := we[w]
wr := WatchResponse{WatchID: w.id, Events: es}
select { select {
case w.ch <- es: case w.ch <- wr:
pendingEventsGauge.Add(float64(len(es))) pendingEventsGauge.Add(float64(len(es)))
default: default:
// move slow watcher to unsynced // move slow watcher to unsynced
@ -427,9 +429,9 @@ type watcher struct {
cur int64 cur int64
id int64 id int64
// a chan to send out the watched events. // a chan to send out the watch response.
// The chan might be shared with other watchers. // The chan might be shared with other watchers.
ch chan<- []storagepb.Event ch chan<- WatchResponse
} }
// unsafeAddWatcher puts watcher with key k into watchableStore's synced. // unsafeAddWatcher puts watcher with key k into watchableStore's synced.
@ -475,7 +477,6 @@ func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.E
if !w.prefix && i != len(ev.Kv.Key) { if !w.prefix && i != len(ev.Kv.Key) {
continue continue
} }
ev.WatchID = w.id
if _, ok := watcherToEvents[w]; !ok { if _, ok := watcherToEvents[w]; !ok {
watcherToEvents[w] = []storagepb.Event{} watcherToEvents[w] = []storagepb.Event{}

View File

@ -186,7 +186,8 @@ func TestSyncWatchers(t *testing.T) {
if len(w.(*watchStream).ch) != watcherN { if len(w.(*watchStream).ch) != watcherN {
t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN) t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
} }
evs := <-w.(*watchStream).ch wr := <-w.(*watchStream).ch
evs := wr.Events
if len(evs) != 1 { if len(evs) != 1 {
t.Errorf("len(evs) got = %d, want = 1", len(evs)) t.Errorf("len(evs) got = %d, want = 1", len(evs))
} }

View File

@ -39,8 +39,8 @@ type WatchStream interface {
// TODO: remove the returned CancelFunc. Always use Cancel. // TODO: remove the returned CancelFunc. Always use Cancel.
Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
// Chan returns a chan. All watched events will be sent to the returned chan. // Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan []storagepb.Event Chan() <-chan WatchResponse
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned. // returned.
@ -50,11 +50,18 @@ type WatchStream interface {
Close() Close()
} }
type WatchResponse struct {
// WatchID is the ID of the watcher this response sent to.
WatchID int64
// Events contains all the events that needs to send.
Events []storagepb.Event
}
// watchStream contains a collection of watchers that share // watchStream contains a collection of watchers that share
// one streaming chan to send out watched events and other control events. // one streaming chan to send out watched events and other control events.
type watchStream struct { type watchStream struct {
watchable watchable watchable watchable
ch chan []storagepb.Event ch chan WatchResponse
mu sync.Mutex // guards fields below it mu sync.Mutex // guards fields below it
// nextID is the ID pre-allocated for next new watcher in this stream // nextID is the ID pre-allocated for next new watcher in this stream
@ -80,7 +87,7 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64,
return id, c return id, c
} }
func (ws *watchStream) Chan() <-chan []storagepb.Event { func (ws *watchStream) Chan() <-chan WatchResponse {
return ws.ch return ws.ch
} }

View File

@ -36,12 +36,11 @@ func TestWatcherWatchID(t *testing.T) {
s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("foo"), []byte("bar"))
evs := <-w.Chan() resp := <-w.Chan()
for j, ev := range evs { if resp.WatchID != id {
if ev.WatchID != id { t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id)
}
} }
cancel() cancel()
} }
@ -55,11 +54,9 @@ func TestWatcherWatchID(t *testing.T) {
} }
idm[id] = struct{}{} idm[id] = struct{}{}
evs := <-w.Chan() resp := <-w.Chan()
for j, ev := range evs { if resp.WatchID != id {
if ev.WatchID != id { t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id)
}
} }
cancel() cancel()