mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: rename watcher to watchStream
Watcher vs Watching in storage pkg is confusing. Watcher should be named as watchStream since it contains a channel as stream to send out events. Then we can rename watching to watcher, which actually watches on a key and send watched events through watchStream. This commits renames watcher to watchStram.
This commit is contained in:
parent
41771d9522
commit
ee0b3f42ed
@ -34,10 +34,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|||||||
closec := make(chan struct{})
|
closec := make(chan struct{})
|
||||||
defer close(closec)
|
defer close(closec)
|
||||||
|
|
||||||
watcher := ws.watchable.NewWatcher()
|
watchStream := ws.watchable.NewWatchStream()
|
||||||
defer watcher.Close()
|
defer watchStream.Close()
|
||||||
|
|
||||||
go sendLoop(stream, watcher, closec)
|
go sendLoop(stream, watchStream, closec)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
req, err := stream.Recv()
|
req, err := stream.Recv()
|
||||||
@ -57,7 +57,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|||||||
toWatch = creq.Prefix
|
toWatch = creq.Prefix
|
||||||
prefix = true
|
prefix = true
|
||||||
}
|
}
|
||||||
watcher.Watch(toWatch, prefix, creq.StartRevision)
|
watchStream.Watch(toWatch, prefix, creq.StartRevision)
|
||||||
default:
|
default:
|
||||||
// TODO: support cancellation
|
// TODO: support cancellation
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
@ -65,10 +65,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
|
func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, closec chan struct{}) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case evs, ok := <-watcher.Chan():
|
case evs, ok := <-watchStream.Chan():
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -90,7 +90,7 @@ func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan
|
|||||||
case <-closec:
|
case <-closec:
|
||||||
// drain the chan to clean up pending events
|
// drain the chan to clean up pending events
|
||||||
for {
|
for {
|
||||||
_, ok := <-watcher.Chan()
|
_, ok := <-watchStream.Chan()
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -82,11 +82,11 @@ type WatchableKV interface {
|
|||||||
Watchable
|
Watchable
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watchable is the interface that wraps the NewWatcher function.
|
// Watchable is the interface that wraps the NewWatchStream function.
|
||||||
type Watchable interface {
|
type Watchable interface {
|
||||||
// NewWatcher returns a Watcher that can be used to
|
// NewWatchStream returns a WatchStream that can be used to
|
||||||
// watch events happened or happending on the KV.
|
// watch events happened or happending on the KV.
|
||||||
NewWatcher() Watcher
|
NewWatchStream() WatchStream
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsistentWatchableKV is a WatchableKV that understands the consistency
|
// ConsistentWatchableKV is a WatchableKV that understands the consistency
|
||||||
|
@ -733,7 +733,7 @@ func TestWatchableKVWatch(t *testing.T) {
|
|||||||
s := WatchableKV(newWatchableStore(tmpPath))
|
s := WatchableKV(newWatchableStore(tmpPath))
|
||||||
defer cleanup(s, tmpPath)
|
defer cleanup(s, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
|
|
||||||
wid, cancel := w.Watch([]byte("foo"), true, 0)
|
wid, cancel := w.Watch([]byte("foo"), true, 0)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -784,7 +784,7 @@ func TestWatchableKVWatch(t *testing.T) {
|
|||||||
|
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
w = s.NewWatcher()
|
w = s.NewWatchStream()
|
||||||
wid, cancel = w.Watch([]byte("foo1"), false, 1)
|
wid, cancel = w.Watch([]byte("foo1"), false, 1)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -59,12 +59,12 @@ var (
|
|||||||
Help: "Total number of keys.",
|
Help: "Total number of keys.",
|
||||||
})
|
})
|
||||||
|
|
||||||
watcherGauge = prometheus.NewGauge(
|
watchStreamGauge = prometheus.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "storage",
|
Subsystem: "storage",
|
||||||
Name: "watcher_total",
|
Name: "watch_stream_total",
|
||||||
Help: "Total number of watchers.",
|
Help: "Total number of watch streams.",
|
||||||
})
|
})
|
||||||
|
|
||||||
watchingGauge = prometheus.NewGauge(
|
watchingGauge = prometheus.NewGauge(
|
||||||
@ -143,7 +143,7 @@ func init() {
|
|||||||
prometheus.MustRegister(deleteCounter)
|
prometheus.MustRegister(deleteCounter)
|
||||||
prometheus.MustRegister(txnCounter)
|
prometheus.MustRegister(txnCounter)
|
||||||
prometheus.MustRegister(keysGauge)
|
prometheus.MustRegister(keysGauge)
|
||||||
prometheus.MustRegister(watcherGauge)
|
prometheus.MustRegister(watchStreamGauge)
|
||||||
prometheus.MustRegister(watchingGauge)
|
prometheus.MustRegister(watchingGauge)
|
||||||
prometheus.MustRegister(slowWatchingGauge)
|
prometheus.MustRegister(slowWatchingGauge)
|
||||||
prometheus.MustRegister(totalEventsCounter)
|
prometheus.MustRegister(totalEventsCounter)
|
||||||
|
@ -177,9 +177,9 @@ func (s *watchableStore) Close() error {
|
|||||||
return s.store.Close()
|
return s.store.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *watchableStore) NewWatcher() Watcher {
|
func (s *watchableStore) NewWatchStream() WatchStream {
|
||||||
watcherGauge.Inc()
|
watchStreamGauge.Inc()
|
||||||
return &watcher{
|
return &watchStream{
|
||||||
watchable: s,
|
watchable: s,
|
||||||
ch: make(chan []storagepb.Event, chanBufLen),
|
ch: make(chan []storagepb.Event, chanBufLen),
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
|||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
s.Put(testKey, testValue)
|
s.Put(testKey, testValue)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
|
|
||||||
const k int = 2
|
const k int = 2
|
||||||
benchSampleN := b.N
|
benchSampleN := b.N
|
||||||
@ -92,7 +92,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
|||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
s.Put(testKey, testValue)
|
s.Put(testKey, testValue)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
|
|
||||||
// put 1 million watchers on the same key
|
// put 1 million watchers on the same key
|
||||||
const watcherN = 1000000
|
const watcherN = 1000000
|
||||||
|
@ -33,7 +33,7 @@ func TestWatch(t *testing.T) {
|
|||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
s.Put(testKey, testValue)
|
s.Put(testKey, testValue)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
w.Watch(testKey, true, 0)
|
w.Watch(testKey, true, 0)
|
||||||
|
|
||||||
if _, ok := s.synced[string(testKey)]; !ok {
|
if _, ok := s.synced[string(testKey)]; !ok {
|
||||||
@ -52,7 +52,7 @@ func TestNewWatcherCancel(t *testing.T) {
|
|||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
s.Put(testKey, testValue)
|
s.Put(testKey, testValue)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
_, cancel := w.Watch(testKey, true, 0)
|
_, cancel := w.Watch(testKey, true, 0)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
@ -91,7 +91,7 @@ func TestCancelUnsynced(t *testing.T) {
|
|||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
s.Put(testKey, testValue)
|
s.Put(testKey, testValue)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
|
|
||||||
// arbitrary number for watchers
|
// arbitrary number for watchers
|
||||||
watcherN := 100
|
watcherN := 100
|
||||||
@ -138,7 +138,7 @@ func TestSyncWatchings(t *testing.T) {
|
|||||||
testValue := []byte("bar")
|
testValue := []byte("bar")
|
||||||
s.Put(testKey, testValue)
|
s.Put(testKey, testValue)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
|
|
||||||
// arbitrary number for watchers
|
// arbitrary number for watchers
|
||||||
watcherN := 100
|
watcherN := 100
|
||||||
@ -184,10 +184,10 @@ func TestSyncWatchings(t *testing.T) {
|
|||||||
// All of the watchings actually share one channel
|
// All of the watchings actually share one channel
|
||||||
// so we only need to check one shared channel
|
// so we only need to check one shared channel
|
||||||
// (See watcher.go for more detail).
|
// (See watcher.go for more detail).
|
||||||
if len(w.(*watcher).ch) != watcherN {
|
if len(w.(*watchStream).ch) != watcherN {
|
||||||
t.Errorf("watched event size = %d, want %d", len(w.(*watcher).ch), watcherN)
|
t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
|
||||||
}
|
}
|
||||||
evs := <-w.(*watcher).ch
|
evs := <-w.(*watchStream).ch
|
||||||
if len(evs) != 1 {
|
if len(evs) != 1 {
|
||||||
t.Errorf("len(evs) got = %d, want = 1", len(evs))
|
t.Errorf("len(evs) got = %d, want = 1", len(evs))
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"github.com/coreos/etcd/storage/storagepb"
|
"github.com/coreos/etcd/storage/storagepb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Watcher interface {
|
type WatchStream interface {
|
||||||
// Watch watches the events happening or happened on the given key
|
// Watch watches the events happening or happened on the given key
|
||||||
// or key prefix from the given startRev.
|
// or key prefix from the given startRev.
|
||||||
// The whole event history can be watched unless compacted.
|
// The whole event history can be watched unless compacted.
|
||||||
@ -37,9 +37,9 @@ type Watcher interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// watcher contains a collection of watching that share
|
// watchStream contains a collection of watching that share
|
||||||
// one chan to send out watched events and other control events.
|
// one streaming chan to send out watched events and other control events.
|
||||||
type watcher struct {
|
type watchStream struct {
|
||||||
watchable watchable
|
watchable watchable
|
||||||
ch chan []storagepb.Event
|
ch chan []storagepb.Event
|
||||||
|
|
||||||
@ -50,7 +50,7 @@ type watcher struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: return error if ws is closed?
|
// TODO: return error if ws is closed?
|
||||||
func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
|
func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
|
||||||
ws.mu.Lock()
|
ws.mu.Lock()
|
||||||
defer ws.mu.Unlock()
|
defer ws.mu.Unlock()
|
||||||
if ws.closed {
|
if ws.closed {
|
||||||
@ -67,11 +67,11 @@ func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, can
|
|||||||
return id, c
|
return id, c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *watcher) Chan() <-chan []storagepb.Event {
|
func (ws *watchStream) Chan() <-chan []storagepb.Event {
|
||||||
return ws.ch
|
return ws.ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *watcher) Close() {
|
func (ws *watchStream) Close() {
|
||||||
ws.mu.Lock()
|
ws.mu.Lock()
|
||||||
defer ws.mu.Unlock()
|
defer ws.mu.Unlock()
|
||||||
|
|
||||||
@ -80,5 +80,5 @@ func (ws *watcher) Close() {
|
|||||||
}
|
}
|
||||||
ws.closed = true
|
ws.closed = true
|
||||||
close(ws.ch)
|
close(ws.ch)
|
||||||
watcherGauge.Dec()
|
watchStreamGauge.Dec()
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
|||||||
watchable := newWatchableStore(tmpPath)
|
watchable := newWatchableStore(tmpPath)
|
||||||
defer cleanup(watchable, tmpPath)
|
defer cleanup(watchable, tmpPath)
|
||||||
|
|
||||||
w := watchable.NewWatcher()
|
w := watchable.NewWatchStream()
|
||||||
|
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
@ -22,7 +22,7 @@ func TestWatcherWatchID(t *testing.T) {
|
|||||||
s := WatchableKV(newWatchableStore(tmpPath))
|
s := WatchableKV(newWatchableStore(tmpPath))
|
||||||
defer cleanup(s, tmpPath)
|
defer cleanup(s, tmpPath)
|
||||||
|
|
||||||
w := s.NewWatcher()
|
w := s.NewWatchStream()
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
idm := make(map[int64]struct{})
|
idm := make(map[int64]struct{})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user