mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
test: add test cases to verify consistent reading right after writing
Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
parent
1c5289dd73
commit
b385121bec
@ -364,3 +364,95 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
|
||||
}
|
||||
t.Log("no corruption detected.")
|
||||
}
|
||||
|
||||
func TestCtlV3SerializableRead(t *testing.T) {
|
||||
testCtlV3ReadAfterWrite(t, clientv3.WithSerializable())
|
||||
}
|
||||
|
||||
func TestCtlV3LinearizableRead(t *testing.T) {
|
||||
testCtlV3ReadAfterWrite(t)
|
||||
}
|
||||
|
||||
func testCtlV3ReadAfterWrite(t *testing.T, ops ...clientv3.OpOption) {
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
|
||||
e2e.WithClusterSize(1),
|
||||
e2e.WithEnvVars(map[string]string{"GOFAIL_FAILPOINTS": `raftBeforeSave=sleep("200ms");beforeCommit=sleep("200ms")`}),
|
||||
)
|
||||
require.NoError(t, err, "failed to start etcd cluster: %v", err)
|
||||
defer func() {
|
||||
derr := epc.Close()
|
||||
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
|
||||
}()
|
||||
|
||||
cc, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: epc.EndpointsGRPC(),
|
||||
DialKeepAliveTime: 5 * time.Second,
|
||||
DialKeepAliveTimeout: 1 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
derr := cc.Close()
|
||||
require.NoError(t, derr)
|
||||
}()
|
||||
|
||||
_, err = cc.Put(ctx, "foo", "bar")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Refer to https://github.com/etcd-io/etcd/pull/16658#discussion_r1341346778
|
||||
t.Log("Restarting the etcd process to ensure all data is persisted")
|
||||
err = epc.Procs[0].Restart(ctx)
|
||||
require.NoError(t, err)
|
||||
epc.WaitLeader(t)
|
||||
|
||||
_, err = cc.Put(ctx, "foo", "bar2")
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Log("Killing the etcd process right after successfully writing a new key/value")
|
||||
err = epc.Procs[0].Kill()
|
||||
require.NoError(t, err)
|
||||
err = epc.Procs[0].Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
stopc := make(chan struct{}, 1)
|
||||
donec := make(chan struct{}, 1)
|
||||
|
||||
t.Log("Starting a goroutine to repeatedly read the key/value")
|
||||
count := 0
|
||||
go func() {
|
||||
defer func() {
|
||||
donec <- struct{}{}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-stopc:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
rctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
resp, rerr := cc.Get(rctx, "foo", ops...)
|
||||
cancel()
|
||||
if rerr != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
require.Equal(t, "bar2", string(resp.Kvs[0].Value))
|
||||
}
|
||||
}()
|
||||
|
||||
t.Log("Starting the etcd process again")
|
||||
err = epc.Procs[0].Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
stopc <- struct{}{}
|
||||
|
||||
<-donec
|
||||
assert.Greater(t, count, 0)
|
||||
t.Logf("Checked the key/value %d times", count)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user