Skip to content

Unsubscribe 方法可能导致死锁(close 时重复加锁) #124

@liangleo

Description

@liangleo

问题描述

broker/kafka/subscriber.go 文件中,Unsubscribe 方法在持有锁(s.Lock())的情况下调用了 s.close(),而 close() 方法内部也执行了一次 s.Lock(),这会导致互斥锁的二次加锁,从而造成死锁。

核心复现代码片段:

func (s *subscriber) Unsubscribe(removeFromManager bool) error {
    s.Lock()
    defer s.Unlock()
    err := s.close()
    ...
}

func (s *subscriber) close() error {
    s.Lock()
    defer s.Unlock()
    ...
}

当 Unsubscribe 调用 close 时,会死锁(Go 的 sync.Mutex 为不可重入锁)。

对比如下,其他消息中间件的 subscriber 实现不会导致锁重入死锁:
比如 broker/redis/subscriber.go:

func (s *subscriber) Unsubscribe(removeFromManager bool) error {
    s.Lock()
    defer s.Unlock()
    s.closed = true
    var err error
    if s.conn != nil {
        err = s.conn.Unsubscribe()
    }
    if s.b != nil && s.b.subscribers != nil && removeFromManager {
        _ = s.b.subscribers.RemoveOnly(s.topic)
    }
    return err
}

和 broker/rabbitmq/subscriber.go:

func (s *subscriber) Unsubscribe(removeFromManager bool) error {
    s.Lock()
    defer s.Unlock()
    s.closed = true
    var err error
    if s.ch != nil {
        err = s.ch.Close()
    }
    if s.r != nil && s.r.subscribers != nil && removeFromManager {
        _ = s.r.subscribers.RemoveOnly(s.topic)
    }
    return err
}

不会在 subscriber 内部重复加锁。

复现方式

  1. 创建并运行 kafka subscriber,调用 subscriber.Unsubscribe(true)
  2. 程序阻塞卡死(调用栈会显示在 Lock() 处)

期望行为

  • Unsubscribe 能正常注销订阅,无死锁

建议修复

Unsubscribe 不需要持锁直接调用 close,直接:

func (s *subscriber) Unsubscribe(removeFromManager bool) error {
    err := s.close()
    if s.b != nil && s.b.subscribers != nil && removeFromManager {
        _ = s.b.subscribers.RemoveOnly(s.topic)
    }
    return err
}

影响版本

  • commit: 996a7bb
  • 文件路径: broker/kafka/subscriber.go

相关资料

请开发者确认修复该死锁隐患,谢谢!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions