mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	 45b007b8b4
			
		
	
	
		45b007b8b4
		
	
	
	
	
		
			
			Recipies is set of patterns / primitives implementation on top of clientv3. It's used by integration tests. It shouldn't be considered "server" code.
		
			
				
	
	
		
			78 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			78 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package recipe
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 
 | |
| 	"go.etcd.io/etcd/api/v3/mvccpb"
 | |
| 	v3 "go.etcd.io/etcd/client/v3"
 | |
| )
 | |
| 
 | |
| // Queue implements a multi-reader, multi-writer distributed queue.
 | |
| type Queue struct {
 | |
| 	client *v3.Client
 | |
| 	ctx    context.Context
 | |
| 
 | |
| 	keyPrefix string
 | |
| }
 | |
| 
 | |
| func NewQueue(client *v3.Client, keyPrefix string) *Queue {
 | |
| 	return &Queue{client, context.TODO(), keyPrefix}
 | |
| }
 | |
| 
 | |
| func (q *Queue) Enqueue(val string) error {
 | |
| 	_, err := newUniqueKV(q.client, q.keyPrefix, val)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Dequeue returns Enqueue()'d elements in FIFO order. If the
 | |
| // queue is empty, Dequeue blocks until elements are available.
 | |
| func (q *Queue) Dequeue() (string, error) {
 | |
| 	// TODO: fewer round trips by fetching more than one key
 | |
| 	resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	kv, err := claimFirstKey(q.client, resp.Kvs)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	} else if kv != nil {
 | |
| 		return string(kv.Value), nil
 | |
| 	} else if resp.More {
 | |
| 		// missed some items, retry to read in more
 | |
| 		return q.Dequeue()
 | |
| 	}
 | |
| 
 | |
| 	// nothing yet; wait on elements
 | |
| 	ev, err := WaitPrefixEvents(
 | |
| 		q.client,
 | |
| 		q.keyPrefix,
 | |
| 		resp.Header.Revision,
 | |
| 		[]mvccpb.Event_EventType{mvccpb.PUT})
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	} else if !ok {
 | |
| 		return q.Dequeue()
 | |
| 	}
 | |
| 	return string(ev.Kv.Value), err
 | |
| }
 |