Skip to content

Commit 77c9cee

Browse files
jrickdavecgh
authored andcommitted
peer: Synchronize net.Conn with a mutex
The access guarded by an atomic int32 was incorrect. For example, access to the p.conn could be performed as long as p.connected was non-zero, but p.connected would be incremented before p.conn was ever assigned by AssociateConnection. While here, also add missing mutex locking protecting timeConnected and na.
1 parent fc664c0 commit 77c9cee

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

peer/peer.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -460,10 +460,10 @@ type Peer struct {
460460
bytesSent uint64
461461
lastRecv int64
462462
lastSend int64
463-
connected int32
464463
disconnect int32
465464

466-
conn net.Conn
465+
conn net.Conn
466+
connMtx sync.Mutex
467467

468468
// blake256Hasher is the hash.Hash object that is used by readMessage
469469
// to calculate the hash of read mixing messages. Every peer's hasher
@@ -1918,23 +1918,27 @@ func (p *Peer) QueueInventoryImmediate(invVect *wire.InvVect) {
19181918
//
19191919
// This function is safe for concurrent access.
19201920
func (p *Peer) Connected() bool {
1921-
return atomic.LoadInt32(&p.connected) != 0 &&
1922-
atomic.LoadInt32(&p.disconnect) == 0
1921+
p.connMtx.Lock()
1922+
defer p.connMtx.Unlock()
1923+
1924+
return p.conn != nil && atomic.LoadInt32(&p.disconnect) == 0
19231925
}
19241926

19251927
// Disconnect disconnects the peer by closing the connection. Calling this
19261928
// function when the peer is already disconnected or in the process of
19271929
// disconnecting will have no effect.
19281930
func (p *Peer) Disconnect() {
1929-
if atomic.AddInt32(&p.disconnect, 1) != 1 {
1930-
return
1931-
}
1931+
p.connMtx.Lock()
1932+
defer p.connMtx.Unlock()
19321933

19331934
log.Tracef("Disconnecting %s", p)
1934-
if atomic.LoadInt32(&p.connected) != 0 {
1935+
if p.conn != nil {
19351936
p.conn.Close()
19361937
}
1937-
close(p.quit)
1938+
1939+
if atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
1940+
close(p.quit)
1941+
}
19381942
}
19391943

19401944
// readRemoteVersionMsg waits for the next message to arrive from the remote
@@ -2146,13 +2150,20 @@ func (p *Peer) start() error {
21462150
// Calling this function when the peer is already connected will
21472151
// have no effect.
21482152
func (p *Peer) AssociateConnection(conn net.Conn) {
2153+
p.connMtx.Lock()
2154+
21492155
// Already connected?
2150-
if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
2156+
if p.conn != nil {
2157+
p.connMtx.Unlock()
21512158
return
21522159
}
21532160

21542161
p.conn = conn
2162+
p.connMtx.Unlock()
2163+
2164+
p.statsMtx.Lock()
21552165
p.timeConnected = time.Now()
2166+
p.statsMtx.Unlock()
21562167

21572168
if p.inbound {
21582169
p.addr = p.conn.RemoteAddr().String()
@@ -2166,7 +2177,9 @@ func (p *Peer) AssociateConnection(conn net.Conn) {
21662177
p.Disconnect()
21672178
return
21682179
}
2180+
p.flagsMtx.Lock()
21692181
p.na = na
2182+
p.flagsMtx.Unlock()
21702183
}
21712184

21722185
go func(peer *Peer) {

0 commit comments

Comments
 (0)