|
|
|
@ -48,62 +48,48 @@ func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
|
|
|
|
|
streamID: ids.NewStreamID(),
|
|
|
|
|
idProvider: ids,
|
|
|
|
|
History: History{
|
|
|
|
|
successful: []porcupine.Operation{},
|
|
|
|
|
failed: []porcupine.Operation{},
|
|
|
|
|
operations: []porcupine.Operation{},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendRange(startKey, endKey string, revision, limit int64, start, end time.Duration, resp *clientv3.GetResponse) {
|
|
|
|
|
func (h *AppendableHistory) AppendRange(startKey, endKey string, revision, limit int64, start, end time.Duration, resp *clientv3.GetResponse, err error) {
|
|
|
|
|
request := staleRangeRequest(startKey, endKey, limit, revision)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var respRevision int64
|
|
|
|
|
if resp != nil && resp.Header != nil {
|
|
|
|
|
respRevision = resp.Header.Revision
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: staleRangeRequest(startKey, endKey, limit, revision),
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: rangeResponse(resp.Kvs, resp.Count, respRevision),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
h.appendSuccessful(request, start, end, rangeResponse(resp.Kvs, resp.Count, respRevision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendPut(key, value string, start, end time.Duration, resp *clientv3.PutResponse, err error) {
|
|
|
|
|
request := putRequest(key, value)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
|
if resp != nil && resp.Header != nil {
|
|
|
|
|
revision = resp.Header.Revision
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: putResponse(revision),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
h.appendSuccessful(request, start, end, putResponse(revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Duration, resp *clientv3.PutResponse, err error) {
|
|
|
|
|
request := putWithLeaseRequest(key, value, leaseID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
|
if resp != nil && resp.Header != nil {
|
|
|
|
|
revision = resp.Header.Revision
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: putResponse(revision),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
h.appendSuccessful(request, start, end, putResponse(revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *clientv3.LeaseGrantResponse, err error) {
|
|
|
|
@ -113,45 +99,33 @@ func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *cli
|
|
|
|
|
}
|
|
|
|
|
request := leaseGrantRequest(leaseID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
|
if resp != nil && resp.ResponseHeader != nil {
|
|
|
|
|
revision = resp.ResponseHeader.Revision
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: leaseGrantResponse(revision),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
h.appendSuccessful(request, start, end, leaseGrantResponse(revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendLeaseRevoke(id int64, start, end time.Duration, resp *clientv3.LeaseRevokeResponse, err error) {
|
|
|
|
|
request := leaseRevokeRequest(id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
|
if resp != nil && resp.Header != nil {
|
|
|
|
|
revision = resp.Header.Revision
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: leaseRevokeResponse(revision),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
h.appendSuccessful(request, start, end, leaseRevokeResponse(revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, resp *clientv3.DeleteResponse, err error) {
|
|
|
|
|
request := deleteRequest(key)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
@ -160,13 +134,7 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
|
|
|
|
|
revision = resp.Header.Revision
|
|
|
|
|
deleted = resp.Deleted
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: deleteResponse(deleted, revision),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
h.appendSuccessful(request, start, end, deleteResponse(deleted, revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
|
|
|
|
@ -184,7 +152,7 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, cl
|
|
|
|
|
}
|
|
|
|
|
request := txnRequest(conds, modelOnSuccess, modelOnFailure)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
@ -195,35 +163,18 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, cl
|
|
|
|
|
for _, resp := range resp.Responses {
|
|
|
|
|
results = append(results, toEtcdOperationResult(resp))
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
h.appendSuccessful(request, start, end, txnResponse(results, resp.Succeeded, revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) appendSuccessful(request EtcdRequest, start, end time.Duration, response MaybeEtcdResponse) {
|
|
|
|
|
op := porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: txnResponse(results, resp.Succeeded, revision),
|
|
|
|
|
Output: response,
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) appendSuccessful(op porcupine.Operation) {
|
|
|
|
|
if op.Call >= op.Return {
|
|
|
|
|
panic(fmt.Sprintf("Invalid operation, call(%d) >= return(%d)", op.Call, op.Return))
|
|
|
|
|
}
|
|
|
|
|
if len(h.successful) > 0 {
|
|
|
|
|
prevSuccessful := h.successful[len(h.successful)-1]
|
|
|
|
|
if op.Call <= prevSuccessful.Call {
|
|
|
|
|
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevSuccessful.Call))
|
|
|
|
|
}
|
|
|
|
|
if op.Call <= prevSuccessful.Return {
|
|
|
|
|
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prevSuccessful.Return))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(h.failed) > 0 {
|
|
|
|
|
prevFailed := h.failed[len(h.failed)-1]
|
|
|
|
|
if op.Call <= prevFailed.Call {
|
|
|
|
|
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prevFailed.Call))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
h.successful = append(h.successful, op)
|
|
|
|
|
h.append(op)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
|
|
|
|
@ -298,48 +249,49 @@ func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult {
|
|
|
|
|
func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *clientv3.DefragmentResponse, err error) {
|
|
|
|
|
request := defragmentRequest()
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.appendFailed(request, start.Nanoseconds(), err)
|
|
|
|
|
h.appendFailed(request, start, end, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var revision int64
|
|
|
|
|
if resp != nil && resp.Header != nil {
|
|
|
|
|
revision = resp.Header.Revision
|
|
|
|
|
}
|
|
|
|
|
h.appendSuccessful(porcupine.Operation{
|
|
|
|
|
h.appendSuccessful(request, start, end, defragmentResponse(revision))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) {
|
|
|
|
|
op := porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: start.Nanoseconds(),
|
|
|
|
|
Output: defragmentResponse(revision),
|
|
|
|
|
Output: failedResponse(err),
|
|
|
|
|
Return: end.Nanoseconds(),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
isRead := request.IsRead()
|
|
|
|
|
if !isRead {
|
|
|
|
|
// Failed writes can still be persisted, setting -1 for now as don't know when request has took effect.
|
|
|
|
|
op.Return = -1
|
|
|
|
|
// Operations of single client needs to be sequential.
|
|
|
|
|
// As we don't know return time of failed operations, all new writes need to be done with new stream id.
|
|
|
|
|
h.streamID = h.idProvider.NewStreamID()
|
|
|
|
|
}
|
|
|
|
|
h.append(op)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err error) {
|
|
|
|
|
if len(h.successful) > 0 {
|
|
|
|
|
prevSuccessful := h.successful[len(h.successful)-1]
|
|
|
|
|
if call <= prevSuccessful.Call {
|
|
|
|
|
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", call, prevSuccessful.Call))
|
|
|
|
|
func (h *AppendableHistory) append(op porcupine.Operation) {
|
|
|
|
|
if op.Return != -1 && op.Call >= op.Return {
|
|
|
|
|
panic(fmt.Sprintf("Invalid operation, call(%d) >= return(%d)", op.Call, op.Return))
|
|
|
|
|
}
|
|
|
|
|
if len(h.operations) > 0 {
|
|
|
|
|
prev := h.operations[len(h.operations)-1]
|
|
|
|
|
if op.Call <= prev.Call {
|
|
|
|
|
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", op.Call, prev.Call))
|
|
|
|
|
}
|
|
|
|
|
if call <= prevSuccessful.Return {
|
|
|
|
|
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", call, prevSuccessful.Return))
|
|
|
|
|
if op.Call <= prev.Return {
|
|
|
|
|
panic(fmt.Sprintf("Overlapping operations, new.call(%d) <= prev.return(%d)", op.Call, prev.Return))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(h.failed) > 0 {
|
|
|
|
|
prevFailed := h.failed[len(h.failed)-1]
|
|
|
|
|
if call <= prevFailed.Call {
|
|
|
|
|
panic(fmt.Sprintf("Out of order append, new.call(%d) <= prev.call(%d)", call, prevFailed.Call))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
h.failed = append(h.failed, porcupine.Operation{
|
|
|
|
|
ClientId: h.streamID,
|
|
|
|
|
Input: request,
|
|
|
|
|
Call: call,
|
|
|
|
|
Output: failedResponse(err),
|
|
|
|
|
Return: 0, // For failed writes we don't know when request has really finished.
|
|
|
|
|
})
|
|
|
|
|
// Operations of single client needs to be sequential.
|
|
|
|
|
// As we don't know return time of failed operations, all new writes need to be done with new stream id.
|
|
|
|
|
h.streamID = h.idProvider.NewStreamID()
|
|
|
|
|
h.operations = append(h.operations, op)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getRequest(key string) EtcdRequest {
|
|
|
|
@ -493,54 +445,48 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type History struct {
|
|
|
|
|
successful []porcupine.Operation
|
|
|
|
|
// failed requests are kept separate as we don't know return time of failed operations.
|
|
|
|
|
// Based on https://github.com/anishathalye/porcupine/issues/10
|
|
|
|
|
failed []porcupine.Operation
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h History) Merge(h2 History) History {
|
|
|
|
|
result := History{
|
|
|
|
|
successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)),
|
|
|
|
|
failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)),
|
|
|
|
|
}
|
|
|
|
|
result.successful = append(result.successful, h.successful...)
|
|
|
|
|
result.successful = append(result.successful, h2.successful...)
|
|
|
|
|
result.failed = append(result.failed, h.failed...)
|
|
|
|
|
result.failed = append(result.failed, h2.failed...)
|
|
|
|
|
return result
|
|
|
|
|
operations []porcupine.Operation
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h History) Len() int {
|
|
|
|
|
return len(h.successful) + len(h.failed)
|
|
|
|
|
return len(h.operations)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h History) Operations() []porcupine.Operation {
|
|
|
|
|
operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed))
|
|
|
|
|
var maxTime int64
|
|
|
|
|
for _, op := range h.successful {
|
|
|
|
|
operations = append(operations, op)
|
|
|
|
|
if op.Return > maxTime {
|
|
|
|
|
maxTime = op.Return
|
|
|
|
|
operations := make([]porcupine.Operation, 0, len(h.operations))
|
|
|
|
|
var maxTime = h.lastObservedTime()
|
|
|
|
|
for _, op := range h.operations {
|
|
|
|
|
// Failed requests don't have a known return time.
|
|
|
|
|
if op.Return == -1 {
|
|
|
|
|
// Simulate Infinity by using last observed time.
|
|
|
|
|
op.Return = maxTime + time.Second.Nanoseconds()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, op := range h.failed {
|
|
|
|
|
if op.Call > maxTime {
|
|
|
|
|
maxTime = op.Call
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Failed requests don't have a known return time.
|
|
|
|
|
// Simulate Infinity by using last observed time.
|
|
|
|
|
for _, op := range h.failed {
|
|
|
|
|
op.Return = maxTime + time.Second.Nanoseconds()
|
|
|
|
|
operations = append(operations, op)
|
|
|
|
|
}
|
|
|
|
|
return operations
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h History) lastObservedTime() int64 {
|
|
|
|
|
var maxTime int64
|
|
|
|
|
for _, op := range h.operations {
|
|
|
|
|
if op.Return == -1 {
|
|
|
|
|
// Collect call time from failed operations
|
|
|
|
|
if op.Call > maxTime {
|
|
|
|
|
maxTime = op.Call
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Collect return time from successful operations
|
|
|
|
|
if op.Return > maxTime {
|
|
|
|
|
maxTime = op.Return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return maxTime
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h History) MaxRevision() int64 {
|
|
|
|
|
var maxRevision int64
|
|
|
|
|
for _, op := range h.successful {
|
|
|
|
|
for _, op := range h.operations {
|
|
|
|
|
revision := op.Output.(MaybeEtcdResponse).Revision
|
|
|
|
|
if revision > maxRevision {
|
|
|
|
|
maxRevision = revision
|
|
|
|
|