Skip to content

Commit 317ce76

Browse files
authored
Merge pull request #449 from tonkeeper/fix_liteapi_pool
fix memory leak
2 parents c94580b + 50f9a9d commit 317ce76

File tree

4 files changed

+347
-29
lines changed

4 files changed

+347
-29
lines changed

liteapi/pool/conn_pool.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,16 +360,27 @@ func (p *ConnPool) WaitMasterchainSeqno(ctx context.Context, seqno uint32, timeo
360360
waitID, ch := p.subscribe(seqno)
361361
defer p.unsubscribe(waitID)
362362

363+
timer := time.NewTimer(timeout)
364+
defer timer.Stop()
365+
363366
for {
364367
select {
365368
case <-ctx.Done():
366369
return ctx.Err()
367-
case <-time.After(timeout):
370+
case <-timer.C:
368371
return fmt.Errorf("timeout")
369372
case head := <-ch:
370373
if head.Seqno >= seqno {
371374
return nil
372375
}
376+
// Reset timer for next iteration
377+
if !timer.Stop() {
378+
select {
379+
case <-timer.C:
380+
default:
381+
}
382+
}
383+
timer.Reset(timeout)
373384
}
374385
}
375386
}

liteapi/pool/conn_pool_test.go

Lines changed: 172 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package pool
22

33
import (
44
"context"
5+
"runtime"
6+
"sync"
7+
"sync/atomic"
58
"testing"
69
"time"
710

@@ -25,14 +28,15 @@ func (m *mockConn) IsArchiveNode() bool {
2528
}
2629

2730
func (m *mockConn) ID() int {
28-
return 0
31+
return m.id
2932
}
3033

3134
func (m *mockConn) MasterHead() ton.BlockIDExt {
3235
return ton.BlockIDExt{BlockID: ton.BlockID{Seqno: m.seqno}}
3336
}
3437

3538
func (m *mockConn) SetMasterHead(ext ton.BlockIDExt) {
39+
m.seqno = ext.Seqno
3640
}
3741

3842
func (m *mockConn) MasterSeqno() uint32 {
@@ -156,8 +160,174 @@ func TestConnPool_updateBest(t *testing.T) {
156160
p.updateBest()
157161
c := p.bestConnection().(*mockConn)
158162
if tt.wantID != c.id {
159-
t.Fatalf("want connection id: %v, got: %v", tt.wantID, c.id)
163+
t.Fatalf("expected connection id %d, got %d", tt.wantID, c.id)
160164
}
161165
})
162166
}
163167
}
168+
169+
type mockConnWithClient struct {
170+
*connection
171+
mockIsOK bool
172+
}
173+
174+
func (m *mockConnWithClient) IsOK() bool {
175+
return m.mockIsOK
176+
}
177+
178+
func TestWaitMasterchainSeqno(t *testing.T) {
179+
t.Run("timer reuse prevents memory leak", func(t *testing.T) {
180+
pool := New(BestPingStrategy)
181+
182+
ctx, cancel := context.WithCancel(context.Background())
183+
defer cancel()
184+
185+
go pool.Run(ctx)
186+
187+
testConn := &connection{
188+
id: 1,
189+
masterHeadUpdatedCh: pool.masterHeadUpdatedCh,
190+
}
191+
testConn.SetMasterHead(ton.BlockIDExt{BlockID: ton.BlockID{Seqno: 1}})
192+
193+
wrappedConn := &mockConnWithClient{
194+
connection: testConn,
195+
mockIsOK: true,
196+
}
197+
198+
pool.mu.Lock()
199+
pool.conns = []conn{wrappedConn}
200+
pool.bestConn = wrappedConn
201+
pool.mu.Unlock()
202+
203+
go func() {
204+
for i := uint32(2); i < 50; i++ {
205+
time.Sleep(10 * time.Millisecond)
206+
testConn.SetMasterHead(ton.BlockIDExt{BlockID: ton.BlockID{Seqno: i}})
207+
}
208+
}()
209+
210+
var m1, m2 runtime.MemStats
211+
runtime.GC()
212+
runtime.ReadMemStats(&m1)
213+
214+
for i := uint32(2); i < 45; i++ {
215+
err := pool.WaitMasterchainSeqno(ctx, i, 500*time.Millisecond)
216+
if err != nil {
217+
t.Fatalf("failed to wait for seqno %d: %v", i, err)
218+
}
219+
}
220+
221+
runtime.GC()
222+
runtime.ReadMemStats(&m2)
223+
224+
allocatedKB := (m2.TotalAlloc - m1.TotalAlloc) / 1024
225+
if allocatedKB > 500 {
226+
t.Errorf("excessive memory allocation: %d KB (expected < 500 KB with timer reuse)", allocatedKB)
227+
}
228+
})
229+
230+
t.Run("timeout when seqno not reached", func(t *testing.T) {
231+
pool := New(BestPingStrategy)
232+
233+
ctx := context.Background()
234+
go pool.Run(ctx)
235+
236+
testConn := &connection{
237+
id: 1,
238+
masterHeadUpdatedCh: pool.masterHeadUpdatedCh,
239+
}
240+
testConn.SetMasterHead(ton.BlockIDExt{BlockID: ton.BlockID{Seqno: 1}})
241+
242+
wrappedConn := &mockConnWithClient{
243+
connection: testConn,
244+
mockIsOK: true,
245+
}
246+
247+
pool.mu.Lock()
248+
pool.conns = []conn{wrappedConn}
249+
pool.bestConn = wrappedConn
250+
pool.mu.Unlock()
251+
252+
err := pool.WaitMasterchainSeqno(ctx, 9999, 100*time.Millisecond)
253+
if err == nil {
254+
t.Fatal("expected timeout error, got nil")
255+
}
256+
if err.Error() != "timeout" {
257+
t.Fatalf("expected 'timeout' error, got %q", err.Error())
258+
}
259+
})
260+
}
261+
262+
func TestMultipleConnectionsChannelHandling(t *testing.T) {
263+
pool := New(BestPingStrategy)
264+
265+
ctx, cancel := context.WithCancel(context.Background())
266+
defer cancel()
267+
268+
go pool.Run(ctx)
269+
270+
var wrappedConns []*mockConnWithClient
271+
for i := 0; i < 5; i++ {
272+
c := &connection{
273+
id: i,
274+
masterHeadUpdatedCh: pool.masterHeadUpdatedCh,
275+
}
276+
wrapped := &mockConnWithClient{
277+
connection: c,
278+
mockIsOK: true,
279+
}
280+
wrappedConns = append(wrappedConns, wrapped)
281+
}
282+
283+
pool.mu.Lock()
284+
pool.conns = make([]conn, len(wrappedConns))
285+
for i, c := range wrappedConns {
286+
pool.conns[i] = c
287+
}
288+
pool.bestConn = wrappedConns[0]
289+
pool.mu.Unlock()
290+
291+
var blocked atomic.Int32
292+
var wg sync.WaitGroup
293+
294+
for idx, wrapped := range wrappedConns {
295+
wg.Add(1)
296+
go func(connID int, conn *connection) {
297+
defer wg.Done()
298+
299+
for seqno := uint32(1); seqno <= 20; seqno++ {
300+
done := make(chan bool, 1)
301+
302+
go func(s uint32) {
303+
conn.SetMasterHead(ton.BlockIDExt{BlockID: ton.BlockID{Seqno: s}})
304+
done <- true
305+
}(seqno)
306+
307+
select {
308+
case <-done:
309+
case <-time.After(100 * time.Millisecond):
310+
blocked.Add(1)
311+
return
312+
}
313+
314+
time.Sleep(5 * time.Millisecond)
315+
}
316+
}(idx, wrapped.connection)
317+
}
318+
319+
wg.Wait()
320+
321+
if blocked.Load() > 0 {
322+
t.Errorf("failed to send updates: %d connection(s) blocked (channel overflow)", blocked.Load())
323+
}
324+
}
325+
326+
func TestChannelBufferCapacity(t *testing.T) {
327+
pool := New(BestPingStrategy)
328+
329+
capacity := cap(pool.masterHeadUpdatedCh)
330+
if capacity < 10 {
331+
t.Errorf("insufficient channel buffer: got %d, expected >= 10", capacity)
332+
}
333+
}

liteapi/pool/connection.go

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,20 @@ func (c *connection) Run(ctx context.Context, detectArchive bool) {
4646
for {
4747
var head ton.BlockIDExt
4848
for {
49+
select {
50+
case <-ctx.Done():
51+
return
52+
default:
53+
}
54+
4955
res, err := c.client.LiteServerGetMasterchainInfo(ctx)
5056
if err != nil {
51-
// TODO: log error
52-
time.Sleep(1000 * time.Millisecond)
53-
continue
57+
select {
58+
case <-ctx.Done():
59+
return
60+
case <-time.After(1000 * time.Millisecond):
61+
continue
62+
}
5463
}
5564
head = res.Last.ToBlockIdExt()
5665
break
@@ -59,11 +68,15 @@ func (c *connection) Run(ctx context.Context, detectArchive bool) {
5968
for {
6069
res, err := c.client.WaitMasterchainBlock(ctx, head.Seqno+1, 15_000)
6170
if err != nil {
62-
// TODO: log error
63-
time.Sleep(1000 * time.Millisecond)
64-
// we want to request seqno again with LiteServerGetMasterchainInfo
65-
// to avoid situation when this server has been offline for too long,
66-
// and it doesn't contain a block with the latest known seqno anymore.
71+
select {
72+
case <-ctx.Done():
73+
return
74+
case <-time.After(1000 * time.Millisecond):
75+
// we want to request seqno again with LiteServerGetMasterchainInfo
76+
// to avoid situation when this server has been offline for too long,
77+
// and it doesn't contain a block with the latest known seqno anymore.
78+
break
79+
}
6780
break
6881
}
6982
if ctx.Err() != nil {
@@ -96,13 +109,20 @@ func (c *connection) MasterHead() ton.BlockIDExt {
96109

97110
func (c *connection) SetMasterHead(head ton.BlockIDExt) {
98111
c.mu.Lock()
99-
defer c.mu.Unlock()
100-
if head.Seqno > c.masterHead.Seqno {
101-
c.masterHead = head
102-
c.masterHeadUpdatedCh <- masterHeadUpdated{
103-
Head: head,
104-
Conn: c,
105-
}
112+
if head.Seqno <= c.masterHead.Seqno {
113+
c.mu.Unlock()
114+
return
115+
}
116+
c.masterHead = head
117+
c.mu.Unlock()
118+
119+
select {
120+
case c.masterHeadUpdatedCh <- masterHeadUpdated{
121+
Head: head,
122+
Conn: c,
123+
}:
124+
default:
125+
// Channel full - skip notification, pool will catch up on next update
106126
}
107127
}
108128

0 commit comments

Comments
 (0)