Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@ import (

const statsTTL = 90 * 24 * time.Hour // 90 days

// LeaseDuration is the duration used to initially create a lease and to extend it thereafter.
const LeaseDuration = 30 * time.Second
// DefaultLeaseDuration is the default lease duration (30 seconds)
const DefaultLeaseDuration = 30 * time.Second

// RDB is a client interface to query and mutate task queues.
type RDB struct {
client redis.UniversalClient
clock timeutil.Clock
queuesPublished sync.Map
leaseDuration time.Duration // configurable lease duration
}

// NewRDB returns a new instance of RDB.
func NewRDB(client redis.UniversalClient) *RDB {
return &RDB{
client: client,
clock: timeutil.NewRealClock(),
client: client,
clock: timeutil.NewRealClock(),
leaseDuration: DefaultLeaseDuration,
}
}

Expand All @@ -57,6 +59,15 @@ func (r *RDB) SetClock(c timeutil.Clock) {
r.clock = c
}

// SetLeaseDuration sets the lease duration for this RDB instance.
// If d is zero or negative, the default duration (30s) is used.
func (r *RDB) SetLeaseDuration(d time.Duration) {
if d <= 0 {
d = DefaultLeaseDuration
}
r.leaseDuration = d
}

// Ping checks the connection with redis server.
func (r *RDB) Ping() error {
return r.client.Ping(context.Background()).Err()
Expand Down Expand Up @@ -250,7 +261,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT
base.ActiveKey(qname),
base.LeaseKey(qname),
}
leaseExpirationTime = r.clock.Now().Add(LeaseDuration)
leaseExpirationTime = r.clock.Now().Add(r.leaseDuration)
argv := []interface{}{
leaseExpirationTime.Unix(),
base.TaskKeyPrefix(qname),
Expand Down Expand Up @@ -1352,10 +1363,10 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task
return msgs, nil
}

// ExtendLease extends the lease for the given tasks by LeaseDuration (30s).
// ExtendLease extends the lease for the given tasks by the configured lease duration.
// It returns a new expiration time if the operation was successful.
func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error) {
expireAt := r.clock.Now().Add(LeaseDuration)
expireAt := r.clock.Now().Add(r.leaseDuration)
var zs []redis.Z
for _, id := range ids {
zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())})
Expand Down
50 changes: 48 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,25 @@ type Config struct {
// If unset or zero, default batch size of 100 is used.
// Make sure to not put a big number as the batch size to prevent a long-running script.
JanitorBatchSize int

// LeaseDuration specifies the duration for which a worker can hold a lease on a task.
// The worker must extend the lease by sending heartbeat to the server before it expires.
//
// Longer lease duration provides better tolerance for network instability (e.g., cross-region VPS deployments)
// but delays task recovery when a worker crashes.
//
// Recommended range: 30s to 120s
// If unset or zero, default duration of 30 seconds is used.
LeaseDuration time.Duration

// HeartbeatInterval specifies the interval between heartbeats sent by the server to extend task leases.
//
// Shorter intervals provide faster detection of worker failures but increase Redis load.
// LeaseDuration should be at least 3x HeartbeatInterval to allow for network failures.
//
// Recommended range: 3s to 10s
// If unset or zero, default interval of 5 seconds is used.
HeartbeatInterval time.Duration
}

// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
Expand Down Expand Up @@ -503,7 +522,34 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
logger.SetLevel(toInternalLogLevel(loglevel))

// Process LeaseDuration configuration
leaseDuration := cfg.LeaseDuration
if leaseDuration <= 0 {
leaseDuration = rdb.DefaultLeaseDuration // use constant from rdb package
}
if leaseDuration < 10*time.Second || leaseDuration > 5*time.Minute {
logger.Warnf("LeaseDuration %v is out of recommended range [10s, 5m]", leaseDuration)
}

// Process HeartbeatInterval configuration
heartbeatInterval := cfg.HeartbeatInterval
if heartbeatInterval <= 0 {
heartbeatInterval = 5 * time.Second // default
}
if heartbeatInterval < 1*time.Second {
logger.Warnf("HeartbeatInterval %v is too short, may cause high Redis load", heartbeatInterval)
}
if heartbeatInterval > 30*time.Second {
logger.Warnf("HeartbeatInterval %v is too long, may delay lease expiration detection", heartbeatInterval)
}

// Validate: LeaseDuration should be at least 3x HeartbeatInterval
if leaseDuration < heartbeatInterval*3 {
panic(fmt.Sprintf("LeaseDuration (%v) must be at least 3x HeartbeatInterval (%v)", leaseDuration, heartbeatInterval))
}

rdb := rdb.NewRDB(c)
rdb.SetLeaseDuration(leaseDuration)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
Expand All @@ -513,12 +559,12 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
syncer := newSyncer(syncerParams{
logger: logger,
requestsCh: syncCh,
interval: 5 * time.Second,
interval: heartbeatInterval,
})
heartbeater := newHeartbeater(heartbeaterParams{
logger: logger,
broker: rdb,
interval: 5 * time.Second,
interval: heartbeatInterval,
concurrency: n,
queues: queues,
strictPriority: cfg.StrictPriority,
Expand Down
Loading