mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
contrib/recipes: use clientv3 watcher API
This commit is contained in:
parent
f8c998906c
commit
936e991f9f
@ -26,6 +26,7 @@ var (
|
||||
ErrKeyExists = errors.New("key already exists")
|
||||
ErrWaitMismatch = errors.New("unexpected wait result")
|
||||
ErrTooManyClients = errors.New("too many clients")
|
||||
ErrNoWatcher = errors.New("no watcher channel")
|
||||
)
|
||||
|
||||
// deleteRevKey deletes a key by revision, returning false if key is missing
|
||||
|
@ -17,138 +17,41 @@ package recipe
|
||||
import (
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
type Watcher struct {
|
||||
wstream pb.Watch_WatchClient
|
||||
cancel context.CancelFunc
|
||||
donec chan struct{}
|
||||
id storage.WatchID
|
||||
recvc chan *storagepb.Event
|
||||
lastErr error
|
||||
}
|
||||
|
||||
func NewWatcher(c *clientv3.Client, key string, rev int64) (*Watcher, error) {
|
||||
return newWatcher(c, key, rev, false)
|
||||
}
|
||||
|
||||
func NewPrefixWatcher(c *clientv3.Client, prefix string, rev int64) (*Watcher, error) {
|
||||
return newWatcher(c, prefix, rev, true)
|
||||
}
|
||||
|
||||
func newWatcher(c *clientv3.Client, key string, rev int64, isPrefix bool) (*Watcher, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
w, err := c.Watch.Watch(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := &pb.WatchCreateRequest{StartRevision: rev}
|
||||
if isPrefix {
|
||||
req.Prefix = []byte(key)
|
||||
} else {
|
||||
req.Key = []byte(key)
|
||||
}
|
||||
|
||||
if err := w.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{CreateRequest: req}}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wresp, err := w.Recv()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(wresp.Events) != 0 || wresp.Created != true {
|
||||
return nil, ErrWaitMismatch
|
||||
}
|
||||
ret := &Watcher{
|
||||
wstream: w,
|
||||
cancel: cancel,
|
||||
donec: make(chan struct{}),
|
||||
id: storage.WatchID(wresp.WatchId),
|
||||
recvc: make(chan *storagepb.Event),
|
||||
}
|
||||
go ret.recvLoop()
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (w *Watcher) Close() error {
|
||||
defer w.cancel()
|
||||
if w.wstream == nil {
|
||||
return w.lastErr
|
||||
}
|
||||
req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{
|
||||
CancelRequest: &pb.WatchCancelRequest{
|
||||
WatchId: int64(w.id)}}}
|
||||
err := w.wstream.Send(req)
|
||||
if err != nil && w.lastErr == nil {
|
||||
return err
|
||||
}
|
||||
w.wstream.CloseSend()
|
||||
w.donec <- struct{}{}
|
||||
<-w.donec
|
||||
w.wstream = nil
|
||||
return w.lastErr
|
||||
}
|
||||
|
||||
func (w *Watcher) Chan() <-chan *storagepb.Event { return w.recvc }
|
||||
|
||||
func (w *Watcher) recvLoop() {
|
||||
defer close(w.donec)
|
||||
for {
|
||||
wresp, err := w.wstream.Recv()
|
||||
if err != nil {
|
||||
w.lastErr = err
|
||||
break
|
||||
}
|
||||
for i := range wresp.Events {
|
||||
select {
|
||||
case <-w.donec:
|
||||
close(w.recvc)
|
||||
return
|
||||
case w.recvc <- wresp.Events[i]:
|
||||
}
|
||||
}
|
||||
}
|
||||
close(w.recvc)
|
||||
<-w.donec
|
||||
}
|
||||
|
||||
func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
||||
i := 0
|
||||
for {
|
||||
ev, ok := <-w.recvc
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if ev.Type == evs[i] {
|
||||
i++
|
||||
if i == len(evs) {
|
||||
return ev, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, w.Close()
|
||||
}
|
||||
|
||||
// WaitEvents waits on a key until it observes the given events and returns the final one.
|
||||
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
||||
w, err := NewWatcher(c, key, rev)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
w := clientv3.NewWatcher(c)
|
||||
wc := w.Watch(context.Background(), key, rev)
|
||||
if wc == nil {
|
||||
w.Close()
|
||||
return nil, ErrNoWatcher
|
||||
}
|
||||
defer w.Close()
|
||||
return w.waitEvents(evs)
|
||||
return waitEvents(wc, evs), w.Close()
|
||||
}
|
||||
|
||||
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
|
||||
w, err := NewPrefixWatcher(c, prefix, rev)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
w := clientv3.NewWatcher(c)
|
||||
wc := w.WatchPrefix(context.Background(), prefix, rev)
|
||||
if wc == nil {
|
||||
w.Close()
|
||||
return nil, ErrNoWatcher
|
||||
}
|
||||
defer w.Close()
|
||||
return w.waitEvents(evs)
|
||||
return waitEvents(wc, evs), w.Close()
|
||||
}
|
||||
|
||||
func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event {
|
||||
i := 0
|
||||
for wresp := range wc {
|
||||
for _, ev := range wresp.Events {
|
||||
if ev.Type == evs[i] {
|
||||
i++
|
||||
if i == len(evs) {
|
||||
return ev
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user