Skip to content

Commit 17776b9

Browse files
committed
feat: 调整缓存 refresh 策略,增加重试逻辑和清空逻辑
1 parent 9aebf68 commit 17776b9

File tree

2 files changed

+106
-46
lines changed

2 files changed

+106
-46
lines changed

kratos/caching/loadable_cache.go

Lines changed: 65 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package caching
22

33
import (
44
"context"
5-
"log"
65
"sync"
76
"time"
87

@@ -35,16 +34,17 @@ type LoadableCache[K comparable, V any] interface {
3534
type loadableCache[K comparable, V any] struct {
3635
mu sync.RWMutex
3736

38-
tracer trace.Tracer // 链路 provider
39-
traced bool // 是否将普通函数包装为 tracer
40-
c gcache.Cache[K, V] // gcache 对象
41-
exp time.Duration // key 过期时间
42-
size int // 缓存大小,超出的缓存会被 evict
43-
block bool // 是否阻塞当前调用链
44-
retryCount int // 重试次数 (几次后依旧空数据则认为空数据)
45-
refresh func() (map[K]V, error) // 刷新缓存数据的函数
46-
ticker *time.Ticker // 定时器(用于过期刷缓存)
47-
stop chan struct{} // 停止信号
37+
tracer trace.Tracer // 链路 provider
38+
traced bool // 是否将普通函数包装为 tracer
39+
c gcache.Cache[K, V] // gcache 对象
40+
exp time.Duration // key 过期时间
41+
size int // 缓存大小,超出的缓存会被 evict
42+
block bool // 是否阻塞当前调用链
43+
retryCount uint // 重试次数 (几次后依旧空数据则认为空数据) 默认0
44+
currentRetry uint // 当前重试次数,每次成功是需要重置为0
45+
refresh func() (map[K]V, error) // 刷新缓存数据的函数
46+
ticker *time.Ticker // 定时器(用于过期刷缓存)
47+
stop chan struct{} // 停止信号
4848
}
4949

5050
type Option[K comparable, V any] func(*loadableCache[K, V])
@@ -77,6 +77,13 @@ func WithBlock[K comparable, V any]() Option[K, V] {
7777
}
7878
}
7979

80+
// WithRetryCount retry count
81+
func WithRetryCount[K comparable, V any](count uint) Option[K, V] {
82+
return func(cb *loadableCache[K, V]) {
83+
cb.retryCount = count
84+
}
85+
}
86+
8087
// WithTracing enable otel tracing
8188
func WithTracing[K comparable, V any](provider trace.TracerProvider) Option[K, V] {
8289
return func(cb *loadableCache[K, V]) {
@@ -88,14 +95,15 @@ func WithTracing[K comparable, V any](provider trace.TracerProvider) Option[K, V
8895
}
8996
}
9097

91-
9298
func New[K comparable, V any](opts ...Option[K, V]) LoadableCache[K, V] {
9399
cache := &loadableCache[K, V]{
94-
exp: 10 * time.Second,
95-
size: 100,
96-
block: false,
97-
traced: false,
98-
stop: make(chan struct{}, 1),
100+
exp: 10 * time.Second,
101+
size: 100,
102+
block: false,
103+
traced: false,
104+
retryCount: 0,
105+
currentRetry: 0,
106+
stop: make(chan struct{}, 1),
99107
}
100108
// bind options
101109
for _, opt := range opts {
@@ -109,22 +117,20 @@ func New[K comparable, V any](opts ...Option[K, V]) LoadableCache[K, V] {
109117
if cache.refresh != nil {
110118
cache.ticker = time.NewTicker(cache.exp)
111119

120+
// 初始化时第一次加载缓存数据
121+
// 重试3次, 每次间隔最多1秒
122+
// 如果3次都失败,则为空缓存
112123
firstLoad := func() {
113-
114-
115-
116-
if cache.refresh != nil {
117-
if err := retry.Do(func() error {
118-
ret, err := cache.refresh()
119-
if err != nil {
120-
return err
121-
}
122-
cache.putAll(ret)
123-
return nil
124-
}); err != nil {
125-
panic(err)
124+
load := func() error {
125+
ret, err := cache.refresh()
126+
if err != nil {
127+
return err
126128
}
129+
cache.putAll(ret)
130+
return nil
127131
}
132+
133+
_ = retry.Do(load, retry.Attempts(3), retry.MaxJitter(time.Second))
128134
}
129135
// block 状态不使用 goroutine, 卡住当前调用链等待结束
130136
if cache.block {
@@ -203,34 +209,52 @@ func (cb *loadableCache[K, V]) Stop(ctx context.Context) {
203209
func (cb *loadableCache[K, V]) Restart(ctx context.Context) {
204210
cb.ticker = time.NewTicker(cb.exp)
205211

206-
go cb.rf()
212+
if cb.refresh != nil {
213+
go cb.rf()
214+
}
207215
}
208216

209217
func (cb *loadableCache[K, V]) rf() {
218+
load := func() {
219+
ret, err := cb.refresh()
220+
if err != nil {
221+
// 当重试次数达到上限,则将现在的空数据写入到缓存中
222+
if cb.retryCount > 0 {
223+
cb.currentRetry++
224+
if cb.currentRetry < cb.retryCount {
225+
return
226+
}
227+
}
228+
// 没配置重试,则返回err时,不覆盖缓存数据;防止缓存雪崩;需要清理缓存请手动执行 caching.Purge();
229+
return
230+
}
231+
232+
if cb.retryCount > 0 {
233+
cb.currentRetry = 0
234+
}
235+
_ = cb.putAll(ret)
236+
}
237+
210238
for {
211239
select {
212240
case <-cb.stop:
213241
return
214242
case <-cb.ticker.C:
215243
if cb.refresh != nil {
216-
log.Printf("refresh cache\n")
217-
ret, err := cb.refresh()
218-
if err != nil {
219-
continue
220-
}
221-
cb.putAll(ret)
244+
load()
222245
}
223246
}
224247
}
225248
}
226249

250+
// putAll 写入缓存数据,如果给定空数据,也会写入;如果想要不写入,请在 putAll 前判断 len(ret) <= 0
227251
func (cb *loadableCache[K, V]) putAll(ret map[K]V) bool {
228252
// if ret len is zero, keep cache data
229253
// Tips: if you want to clear cache data, you can use cb.Purge()
230-
if len(ret) <= 0 {
231-
return false
232-
}
233-
254+
// TIPS: remove by 2025-05-22
255+
// if len(ret) <= 0 {
256+
// return false
257+
// }
234258
cb.mu.Lock()
235259
defer cb.mu.Unlock()
236260

kratos/caching/loadable_cache_test.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ func fakeRefresh() (map[int64]string, error) {
2727
}
2828

2929
func TestBaseCache(t *testing.T) {
30-
31-
//var c = 1
30+
// var c = 1
3231

3332
cc := caching.New(
3433
caching.WithTracing[int64, string](otel.GetTracerProvider()),
@@ -182,7 +181,7 @@ func TestConcurrent(t *testing.T) {
182181

183182
for i := 0; i < 20; i++ {
184183
go func() {
185-
for j:=range 100 {
184+
for j := range 100 {
186185
t.Logf("i: %d -> j: %d", i, j)
187186
kvs := cc.GetALL(ctx)
188187
if len(kvs) != 2 {
@@ -194,6 +193,43 @@ func TestConcurrent(t *testing.T) {
194193
}()
195194
}
196195

197-
198196
wg.Wait()
199-
}
197+
}
198+
199+
func TestFirstLoadErr(t *testing.T) {
200+
cnt := 0
201+
cc := caching.New(
202+
caching.WithSize[int64, string](100),
203+
caching.WithExpiration[int64, string](time.Second), // 每2秒刷新一次缓存
204+
caching.WithRetryCount[int64, string](3),
205+
caching.WithRefreshAfterWrite(func() (map[int64]string, error) {
206+
cnt++
207+
if cnt < 3 {
208+
return map[int64]string{
209+
1: "1",
210+
2: "2",
211+
}, nil
212+
}
213+
return map[int64]string{}, errors.New("error")
214+
}), // 每次请求刷出 0 个 kv
215+
caching.WithBlock[int64, string](),
216+
)
217+
218+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Microsecond)
219+
defer cancel()
220+
221+
kvs := cc.GetALL(ctx)
222+
assert.Equal(t, 2, len(kvs))
223+
224+
time.Sleep(time.Second * 3)
225+
kvs = cc.GetALL(ctx)
226+
assert.Equal(t, 2, len(kvs))
227+
228+
time.Sleep(time.Second * 3)
229+
kvs = cc.GetALL(ctx)
230+
assert.Equal(t, 0, len(kvs))
231+
232+
time.Sleep(time.Second * 3)
233+
kvs = cc.GetALL(ctx)
234+
assert.Equal(t, 0, len(kvs))
235+
}

0 commit comments

Comments
 (0)