Refactor merge succesfull and failed operation in history

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-04-10 10:52:30 +02:00
parent 229275d46e
commit 65130c6d21

View File

@ -48,8 +48,7 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
streamID: ids.NewStreamID(),
idProvider: ids,
History: History{
successful: []porcupine.Operation{},
failed: []porcupine.Operation{},
operations: []porcupine.Operation{},
},
}
}
@ -173,22 +172,16 @@ func (h *AppendableHistory) appendSuccessful(request EtcdRequest, start, end tim
if op.Call >= op.Return {
panic(fmt.Sprintf("Invalid operation, call(%d) >= return(%d)", op.Call, op.Return))
}
if len(h.successful) > 0 {
prevSuccessful := h.successful[len(h.successful)-1]
if op.Call <= prevSuccessful.Call {
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevSuccessful.Call))
if len(h.operations) > 0 {
prev := h.operations[len(h.operations)-1]
if op.Call <= prev.Call {
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prev.Call))
}
if op.Call <= prevSuccessful.Return {
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prevSuccessful.Return))
if op.Call <= prev.Return {
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prev.Return))
}
}
if len(h.failed) > 0 {
prevFailed := h.failed[len(h.failed)-1]
if op.Call <= prevFailed.Call {
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevFailed.Call))
}
}
h.successful = append(h.successful, op)
h.operations = append(h.operations, op)
}
func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
@ -279,24 +272,18 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Duratio
Input: request,
Call: start.Nanoseconds(),
Output: failedResponse(err),
Return: 0, // For failed writes we don't know when request has really finished.
Return: -1, // For failed writes we don't know when request has really finished.
}
if len(h.successful) > 0 {
prevSuccessful := h.successful[len(h.successful)-1]
if op.Call <= prevSuccessful.Call {
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevSuccessful.Call))
if len(h.operations) > 0 {
prev := h.operations[len(h.operations)-1]
if op.Call <= prev.Call {
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prev.Call))
}
if op.Call <= prevSuccessful.Return {
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prevSuccessful.Return))
if op.Call <= prev.Return {
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prev.Return))
}
}
if len(h.failed) > 0 {
prevFailed := h.failed[len(h.failed)-1]
if op.Call <= prevFailed.Call {
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevFailed.Call))
}
}
h.failed = append(h.failed, op)
h.operations = append(h.operations, op)
// Operations of single client needs to be sequential.
// As we don't know return time of failed operations, all new writes need to be done with new stream id.
h.streamID = h.idProvider.NewStreamID()
@ -453,54 +440,48 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
}
type History struct {
successful []porcupine.Operation
// failed requests are kept separate as we don't know return time of failed operations.
// Based on https://github.com/anishathalye/porcupine/issues/10
failed []porcupine.Operation
}
func (h History) Merge(h2 History) History {
result := History{
successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)),
failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)),
}
result.successful = append(result.successful, h.successful...)
result.successful = append(result.successful, h2.successful...)
result.failed = append(result.failed, h.failed...)
result.failed = append(result.failed, h2.failed...)
return result
operations []porcupine.Operation
}
func (h History) Len() int {
return len(h.successful) + len(h.failed)
return len(h.operations)
}
func (h History) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
var maxTime int64
for _, op := range h.successful {
operations = append(operations, op)
if op.Return > maxTime {
maxTime = op.Return
operations := make([]porcupine.Operation, 0, len(h.operations))
var maxTime = h.lastObservedTime()
for _, op := range h.operations {
// Failed requests don't have a known return time.
if op.Return == -1 {
// Simulate Infinity by using last observed time.
op.Return = maxTime + time.Second.Nanoseconds()
}
}
for _, op := range h.failed {
if op.Call > maxTime {
maxTime = op.Call
}
}
// Failed requests don't have a known return time.
// Simulate Infinity by using last observed time.
for _, op := range h.failed {
op.Return = maxTime + time.Second.Nanoseconds()
operations = append(operations, op)
}
return operations
}
func (h History) lastObservedTime() int64 {
var maxTime int64
for _, op := range h.operations {
if op.Return == -1 {
// Collect call time from failed operations
if op.Call > maxTime {
maxTime = op.Call
}
} else {
// Collect return time from successful operations
if op.Return > maxTime {
maxTime = op.Return
}
}
}
return maxTime
}
func (h History) MaxRevision() int64 {
var maxRevision int64
for _, op := range h.successful {
for _, op := range h.operations {
revision := op.Output.(MaybeEtcdResponse).Revision
if revision > maxRevision {
maxRevision = revision