-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathcollectionresolver.go
More file actions
117 lines (100 loc) · 3.31 KB
/
collectionresolver.go
File metadata and controls
117 lines (100 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package gocbcorex
import (
"context"
"errors"
"github.com/couchbase/gocbcorex/memdx"
)
type CollectionResolver interface {
ResolveCollectionID(ctx context.Context, scopeName, collectionName string) (collectionId uint32, manifestRev uint64, err error)
InvalidateCollectionID(ctx context.Context, scopeName, collectionName, endpoint string, manifestRev uint64)
}
func OrchestrateMemdCollectionID[RespT any](
ctx context.Context,
cr CollectionResolver,
scopeName, collectionName string,
collectionID uint32,
fn func(collectionID uint32) (RespT, error),
) (RespT, error) {
if collectionID > 0 && collectionName == "" && scopeName == "" {
// If there's an unknown collection ID error then we'll just propagate it.
return fn(collectionID)
}
resolvedCid, manifestRev, err := cr.ResolveCollectionID(ctx, scopeName, collectionName)
if err != nil {
var emptyResp RespT
return emptyResp, err
}
if collectionID > 0 && resolvedCid != collectionID {
cr.InvalidateCollectionID(
ctx,
scopeName, collectionName,
"", 0)
newCollectionID, newManifestRev, newResolveErr :=
cr.ResolveCollectionID(ctx, scopeName, collectionName)
if newResolveErr != nil {
var emptyResp RespT
return emptyResp, newResolveErr
}
if newCollectionID != collectionID {
// If we still don't match after resolution, then we can confidently say that we have the latest
// so the callee must have an out of date collection ID.
var emptyResp RespT
return emptyResp, &CollectionIDMismatchError{
CollectionID: collectionID,
ServerCollectionID: newCollectionID,
ManifestUid: newManifestRev,
}
}
}
for {
res, err := fn(resolvedCid)
if err != nil {
if errors.Is(err, memdx.ErrUnknownCollectionID) {
invalidatingEndpoint := ""
invalidatingManifestRev := uint64(0)
var serverErr *memdx.ServerErrorWithContext
if errors.As(err, &serverErr) {
serverCtx := serverErr.ParseContext()
invalidatingManifestRev = serverCtx.ManifestRev
}
if invalidatingManifestRev > 0 &&
invalidatingManifestRev < manifestRev {
var emptyResp RespT
return emptyResp, &CollectionManifestOutdatedError{
Cause: err,
ManifestUid: manifestRev,
ServerManifestUid: invalidatingManifestRev,
}
}
cr.InvalidateCollectionID(
ctx,
scopeName, collectionName,
invalidatingEndpoint, invalidatingManifestRev)
newCollectionID, newManifestRev, newResolveErr :=
cr.ResolveCollectionID(ctx, scopeName, collectionName)
if newResolveErr != nil {
var emptyResp RespT
return emptyResp, newResolveErr
}
if newCollectionID == resolvedCid {
// if resolution yielded the same response, this means that our ability
// to fetch an updated collection id is compromised, or the server is in
// an older state. In both instances, we no longer have a deterministic
// path to resolution and return the error, allowing retries to occur
// at a higher level if desired.
var emptyResp RespT
return emptyResp, &CollectionManifestOutdatedError{
Cause: err,
ManifestUid: manifestRev,
ServerManifestUid: invalidatingManifestRev,
}
}
resolvedCid = newCollectionID
manifestRev = newManifestRev
continue
}
return res, err
}
return res, nil
}
}