mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: fix sync base
It is not correct to use WithPrefix. Range end will change in every internal batch.
This commit is contained in:
parent
88a9cf2cea
commit
16b0c1d1e1
@ -15,7 +15,9 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
|
|||||||
t.Fatal("failed to receive update in one second")
|
t.Fatal("failed to receive update in one second")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMirrorSyncBase(t *testing.T) {
|
||||||
|
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer cluster.Terminate(nil)
|
||||||
|
|
||||||
|
cli := cluster.Client(0)
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
keyCh := make(chan string)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for key := range keyCh {
|
||||||
|
if _, err := cli.Put(ctx, key, "test"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 2000; i++ {
|
||||||
|
keyCh <- fmt.Sprintf("test%d", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(keyCh)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
syncer := mirror.NewSyncer(cli, "test", 0)
|
||||||
|
respCh, errCh := syncer.SyncBase(ctx)
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
|
||||||
|
for resp := range respCh {
|
||||||
|
count = count + len(resp.Kvs)
|
||||||
|
if !resp.More {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for err := range errCh {
|
||||||
|
t.Fatalf("unexpected error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count != 2000 {
|
||||||
|
t.Errorf("unexpected kv count: %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
|
|||||||
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
|
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
|
||||||
// We then range from the prefix to the next prefix if exists. Or we will
|
// We then range from the prefix to the next prefix if exists. Or we will
|
||||||
// range from the prefix to the end if the next prefix does not exists.
|
// range from the prefix to the end if the next prefix does not exists.
|
||||||
opts = append(opts, clientv3.WithPrefix())
|
opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
|
||||||
key = s.prefix
|
key = s.prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user