Merge pull request #14685 from serathius/linearizability-revision

Revision inconsistency caused by panic during defrag
This commit is contained in:
Marek Siarkowicz 2022-11-14 12:49:22 +01:00 committed by GitHub
commit ca8baeb308
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 145 additions and 66 deletions

View File

@ -66,7 +66,7 @@ func (c *recordingClient) Get(ctx context.Context, key string) error {
ClientId: c.id, ClientId: c.id,
Input: etcdRequest{op: Get, key: key}, Input: etcdRequest{op: Get, key: key},
Call: callTime.UnixNano(), Call: callTime.UnixNano(),
Output: etcdResponse{getData: readData}, Output: etcdResponse{getData: readData, revision: resp.Header.Revision},
Return: returnTime.UnixNano(), Return: returnTime.UnixNano(),
}) })
return nil return nil
@ -74,13 +74,17 @@ func (c *recordingClient) Get(ctx context.Context, key string) error {
func (c *recordingClient) Put(ctx context.Context, key, value string) error { func (c *recordingClient) Put(ctx context.Context, key, value string) error {
callTime := time.Now() callTime := time.Now()
_, err := c.client.Put(ctx, key, value) resp, err := c.client.Put(ctx, key, value)
returnTime := time.Now() returnTime := time.Now()
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
c.operations = append(c.operations, porcupine.Operation{ c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id, ClientId: c.id,
Input: etcdRequest{op: Put, key: key, putData: value}, Input: etcdRequest{op: Put, key: key, putData: value},
Call: callTime.UnixNano(), Call: callTime.UnixNano(),
Output: etcdResponse{err: err}, Output: etcdResponse{err: err, revision: revision},
Return: returnTime.UnixNano(), Return: returnTime.UnixNano(),
}) })
return nil return nil

View File

@ -34,12 +34,14 @@ type etcdRequest struct {
type etcdResponse struct { type etcdResponse struct {
getData string getData string
revision int64
err error err error
} }
type EtcdState struct { type EtcdState struct {
Key string Key string
Value string Value string
LastRevision int64
FailedWrites map[string]struct{} FailedWrites map[string]struct{}
} }
@ -51,9 +53,6 @@ var etcdModel = porcupine.Model{
if err != nil { if err != nil {
panic(err) panic(err)
} }
if state.FailedWrites == nil {
state.FailedWrites = map[string]struct{}{}
}
ok, state := step(state, in.(etcdRequest), out.(etcdResponse)) ok, state := step(state, in.(etcdRequest), out.(etcdResponse))
data, err := json.Marshal(state) data, err := json.Marshal(state)
if err != nil { if err != nil {
@ -64,22 +63,19 @@ var etcdModel = porcupine.Model{
DescribeOperation: func(in, out interface{}) string { DescribeOperation: func(in, out interface{}) string {
request := in.(etcdRequest) request := in.(etcdRequest)
response := out.(etcdResponse) response := out.(etcdResponse)
var resp string
switch request.op { switch request.op {
case Get: case Get:
if response.err != nil { if response.err != nil {
resp = response.err.Error() return fmt.Sprintf("get(%q) -> %q", request.key, response.err)
} else { } else {
resp = response.getData return fmt.Sprintf("get(%q) -> %q, rev: %d", request.key, response.getData, response.revision)
} }
return fmt.Sprintf("get(%q) -> %q", request.key, resp)
case Put: case Put:
if response.err != nil { if response.err != nil {
resp = response.err.Error() return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, response.err)
} else { } else {
resp = "ok" return fmt.Sprintf("put(%q, %q) -> ok, rev: %d", request.key, request.putData, response.revision)
} }
return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, resp)
default: default:
return "<invalid>" return "<invalid>"
} }
@ -88,33 +84,68 @@ var etcdModel = porcupine.Model{
func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) {
if request.key == "" { if request.key == "" {
panic("Invalid request") panic("invalid request")
} }
if state.Key == "" { if state.Key == "" {
state.Key = request.key return true, initState(request, response)
} }
if state.Key != request.key { if state.Key != request.key {
panic("Multiple keys not supported") panic("Multiple keys not supported")
} }
switch request.op { switch request.op {
case Get: case Get:
if state.Value == response.getData { return stepGet(state, request, response)
return true, state case Put:
return stepPut(state, request, response)
default:
panic("Unknown operation")
} }
for write := range state.FailedWrites { }
if write == response.getData {
func initState(request etcdRequest, response etcdResponse) EtcdState {
state := EtcdState{
Key: request.key,
LastRevision: response.revision,
FailedWrites: map[string]struct{}{},
}
switch request.op {
case Get:
state.Value = response.getData state.Value = response.getData
delete(state.FailedWrites, write)
return true, state
}
}
case Put: case Put:
if response.err == nil { if response.err == nil {
state.Value = request.putData state.Value = request.putData
} else { } else {
state.FailedWrites[request.putData] = struct{}{} state.FailedWrites[request.putData] = struct{}{}
} }
default:
panic("Unknown operation")
}
return state
}
func stepGet(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) {
if state.Value == response.getData && state.LastRevision <= response.revision {
return true, state
}
_, ok := state.FailedWrites[response.getData]
if ok && state.LastRevision < response.revision {
state.Value = response.getData
state.LastRevision = response.revision
delete(state.FailedWrites, response.getData)
return true, state return true, state
} }
return false, state return false, state
} }
func stepPut(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) {
if response.err != nil {
state.FailedWrites[request.putData] = struct{}{}
return true, state
}
if state.LastRevision >= response.revision {
return false, state
}
state.Value = request.putData
state.LastRevision = response.revision
return true, state
}

View File

@ -16,68 +16,112 @@ package linearizability
import ( import (
"errors" "errors"
"github.com/anishathalye/porcupine"
"testing" "testing"
) )
func TestModel(t *testing.T) { func TestModel(t *testing.T) {
tcs := []struct { tcs := []struct {
name string name string
okOperations []porcupine.Operation operations []testOperation
failOperation *porcupine.Operation
}{ }{
{ {
name: "Etcd must return what was written", name: "First Get can start from non-empty value and non-zero revision",
okOperations: []porcupine.Operation{ operations: []testOperation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{}}, {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 42}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}},
},
failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "2"}},
},
{
name: "Etcd can crash after storing result but before returning success to client",
okOperations: []porcupine.Operation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}},
}, },
}, },
{ {
name: "Etcd can crash before storing result", name: "First Put can start from non-zero revision",
okOperations: []porcupine.Operation{ operations: []testOperation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 42}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}},
}, },
}, },
{ {
name: "Etcd can continue errored request after it failed", name: "Get response data should match PUT",
okOperations: []porcupine.Operation{ operations: []testOperation{
{Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 1}, failure: true},
{Input: etcdRequest{op: Put, key: "key"}, Output: etcdResponse{getData: "2"}}, {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "1", revision: 1}},
{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, },
},
{
name: "Get response revision should be equal or greater then put",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key"}, resp: etcdResponse{revision: 2}},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 1}, failure: true},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 2}},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 4}},
},
},
{
name: "Put bumps revision",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 1}, failure: true},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}},
},
},
{
name: "Put can fail and be lost",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}},
{req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 2}},
},
},
{
name: "Put can fail but bump revision",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}},
{req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 3}},
},
},
{
name: "Put can fail but be persisted and bump revision",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 1}, failure: true},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}},
},
},
{
name: "Put can fail but be persisted later",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{err: errors.New("failed")}},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "1", revision: 3}},
},
},
{
name: "Put can fail but bump revision later",
operations: []testOperation{
{req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{err: errors.New("failed")}},
{req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}},
{req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}},
{req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 4}},
}, },
failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}},
}, },
} }
for _, tc := range tcs { for _, tc := range tcs {
var ok bool var ok bool
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
state := etcdModel.Init() state := etcdModel.Init()
for _, op := range tc.okOperations { for _, op := range tc.operations {
t.Logf("state: %v", state) t.Logf("state: %v", state)
ok, state = etcdModel.Step(state, op.Input, op.Output) ok, state = etcdModel.Step(state, op.req, op.resp)
if !ok { if ok != !op.failure {
t.Errorf("Unexpected failed operation: %s", etcdModel.DescribeOperation(op.Input, op.Output)) t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, etcdModel.DescribeOperation(op.req, op.resp))
} }
} }
if tc.failOperation != nil {
t.Logf("state: %v", state)
ok, state = etcdModel.Step(state, tc.failOperation.Input, tc.failOperation.Output)
if ok {
t.Errorf("Unexpected succesfull operation: %s", etcdModel.DescribeOperation(tc.failOperation.Input, tc.failOperation.Output))
}
}
}) })
} }
} }
type testOperation struct {
req etcdRequest
resp etcdResponse
failure bool
}