-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathmemdclient.go
More file actions
114 lines (93 loc) · 2.31 KB
/
memdclient.go
File metadata and controls
114 lines (93 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package gocbcorex
import (
"context"
"sync"
"time"
"github.com/couchbase/gocbcorex/memdx"
)
type syncCrudResult struct {
Result interface{}
Err error
}
type syncCrudResulter struct {
Ch chan syncCrudResult
}
var syncCrudResulterPool sync.Pool
func allocSyncCrudResulter() *syncCrudResulter {
resulter := syncCrudResulterPool.Get()
if resulter == nil {
return &syncCrudResulter{
Ch: make(chan syncCrudResult, 1),
}
}
return resulter.(*syncCrudResulter)
}
func releaseSyncCrudResulter(v *syncCrudResulter) {
syncCrudResulterPool.Put(v)
}
type MemdClientTelem interface {
BeginOp(ctx context.Context, bucketName string, opName string) (context.Context, MemdClientTelemOp)
}
type MemdClientTelemOp interface {
IsRecording() bool
MarkSent()
MarkReceived()
RecordServerDuration(d time.Duration)
End(ctx context.Context, err error)
}
type MemdClient interface {
SelectedBucket() string
Telemetry() MemdClientTelem
MemdxClient
}
func memdClient_SimpleCall[Encoder any, ReqT memdx.OpRequest, RespT memdx.OpResponse](
ctx context.Context,
c MemdClient,
o Encoder,
execFn func(o Encoder, d memdx.Dispatcher, req ReqT, cb func(RespT, error)) (memdx.PendingOp, error),
req ReqT,
) (RespT, error) {
bucketName := c.SelectedBucket()
ctx, opTelem := c.Telemetry().BeginOp(ctx, bucketName, req.OpName())
resulter := allocSyncCrudResulter()
pendingOp, err := execFn(o, c, req, func(resp RespT, err error) {
if err != nil && bucketName != "" {
err = &KvBucketError{
Cause: err,
BucketName: bucketName,
}
}
opTelem.MarkReceived()
if opTelem.IsRecording() {
var emptyResp RespT
if resp != emptyResp {
if sdResp, _ := any(resp).(memdx.ServerDurationResponse); sdResp != nil {
opTelem.RecordServerDuration(sdResp.GetServerDuration())
}
}
}
resulter.Ch <- syncCrudResult{
Result: resp,
Err: err,
}
})
if err != nil {
releaseSyncCrudResulter(resulter)
opTelem.End(ctx, err)
var emptyResp RespT
return emptyResp, err
}
opTelem.MarkSent()
select {
case res := <-resulter.Ch:
releaseSyncCrudResulter(resulter)
opTelem.End(ctx, err)
return res.Result.(RespT), res.Err
case <-ctx.Done():
pendingOp.Cancel(ctx.Err())
res := <-resulter.Ch
releaseSyncCrudResulter(resulter)
opTelem.End(ctx, ctx.Err())
return res.Result.(RespT), res.Err
}
}