Skip to content

Commit f5e29a4

Browse files
committed
fix high limit flow
1 parent 775a7e3 commit f5e29a4

File tree

2 files changed

+63
-23
lines changed

2 files changed

+63
-23
lines changed

operation.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,35 +30,41 @@ func NewOperation(ctx context.Context, reader bool) (op *Operation) {
3030
func (op *Operation) run(ctx context.Context, ch chan<- struct{}) {
3131
defer close(ch)
3232
seccount := 0
33-
now := time.Now()
33+
counts := make([]int32, secparts)
34+
tckr := time.NewTicker(interval)
35+
defer tckr.Stop()
3436

3537
for {
36-
if elapsed := time.Since(now); elapsed > 0 {
37-
now = now.Add(elapsed)
38-
39-
if limit := op.Limit.Load(); limit > 0 {
40-
todo := max(1, limit/secparts)
41-
batch := min(1024, todo)
42-
op.batch.Store(batch)
43-
for todo >= batch && time.Since(now) < (interval-(interval/10)) {
38+
if limit := op.Limit.Load(); limit > 0 {
39+
todo := max(1, limit/secparts)
40+
batch := min(1024, todo)
41+
op.batch.Store(batch)
42+
drive:
43+
for {
44+
select {
45+
case <-ctx.Done():
46+
return
47+
case ch <- struct{}{}:
48+
todo -= batch
4449
todo += op.avail.Swap(0)
45-
select {
46-
case <-ctx.Done():
47-
return
48-
case ch <- struct{}{}:
49-
todo -= batch
50-
default:
51-
time.Sleep(interval / 10)
50+
if todo < batch {
51+
<-tckr.C
52+
break drive
5253
}
54+
case <-tckr.C:
55+
break drive
5356
}
5457
}
55-
56-
time.Sleep(interval - time.Since(now))
58+
counts[seccount] = op.count.Swap(0)
5759
seccount++
5860
if seccount >= secparts {
5961
seccount = 0
60-
op.Rate.Store(op.count.Swap(0))
6162
}
63+
var rate int32
64+
for i := 0; i < secparts; i++ {
65+
rate += counts[i]
66+
}
67+
op.Rate.Store(rate)
6268
}
6369
}
6470
}

operation_test.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestOperation_io_read_limit(t *testing.T) {
6262
}
6363
}
6464

65-
func TestOperation_read_rate(t *testing.T) {
65+
func TestOperation_read_rate_low(t *testing.T) {
6666
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
6767
defer cancel()
6868
l := NewLimiter(ctx)
@@ -89,6 +89,36 @@ func TestOperation_read_rate(t *testing.T) {
8989
}
9090
}
9191

92+
func TestOperation_read_rate_high(t *testing.T) {
93+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
94+
defer cancel()
95+
l := NewLimiter(ctx)
96+
97+
const numbytes = 10 * 1000000
98+
l.Reads.Limit.Store(numbytes)
99+
100+
now := time.Now()
101+
r := bytes.NewReader(make([]byte, numbytes*2))
102+
var numread int
103+
104+
for numread < numbytes {
105+
buf := make([]byte, 1000)
106+
n, err := l.Reads.io(r.Read, buf)
107+
numread += n
108+
if err != nil {
109+
t.Error(numread, n)
110+
t.Fatal(err)
111+
}
112+
}
113+
114+
if elapsed := time.Since(now); elapsed < time.Millisecond*900 || elapsed > time.Millisecond*1100 {
115+
t.Error(elapsed)
116+
}
117+
if rate := int(l.Reads.Rate.Load()); rate < numbytes*0.9 || rate > numbytes {
118+
t.Error(rate)
119+
}
120+
}
121+
92122
func TestOperation_write_rate(t *testing.T) {
93123
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
94124
defer cancel()
@@ -106,10 +136,14 @@ func TestOperation_write_rate(t *testing.T) {
106136
t.Error(err)
107137
}
108138

109-
for l.Writes.Rate.Load() == 0 && ctx.Err() == nil {
110-
time.Sleep(time.Second / 100)
139+
rate := l.Writes.Rate.Load()
140+
for rate == 0 && ctx.Err() == nil {
141+
rate = l.Writes.Rate.Load()
142+
}
143+
if ctx.Err() != nil {
144+
t.Fatal(ctx.Err())
111145
}
112-
if rate := l.Writes.Rate.Load(); rate != 10 {
146+
if rate != 10 {
113147
t.Error(rate)
114148
}
115149
}

0 commit comments

Comments
 (0)