Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions asynq.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ type RedisClientOpt struct {
// See: https://redis.io/commands/select.
DB int

// KeyPrefix is the Redis key prefix for all Asynq operations.
// This allows you to namespace Asynq keys for different environments.
// If empty, the default prefix "asynq" will be used.
// Examples: "asynq-prod", "asynq-dev", "asynq-staging"
KeyPrefix string

// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
Expand Down Expand Up @@ -315,6 +321,9 @@ type RedisClientOpt struct {
}

func (opt RedisClientOpt) MakeRedisClient() interface{} {
// Apply key prefix if specified (preserves existing behavior if not specified)
base.ApplyKeyPrefix(opt.KeyPrefix)

return redis.NewClient(&redis.Options{
Network: opt.Network,
Addr: opt.Addr,
Expand Down Expand Up @@ -359,6 +368,12 @@ type RedisFailoverClientOpt struct {
// See: https://redis.io/commands/select.
DB int

// KeyPrefix is the Redis key prefix for all Asynq operations.
// This allows you to namespace Asynq keys for different environments.
// If empty, the default prefix "asynq" will be used.
// Examples: "asynq-prod", "asynq-dev", "asynq-staging"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also describe what happens if someone creates multiple clients while setting different prefixes for all of them including a potential empty string.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each client instance is designed for a specific use case with its own prefix, so they won't interfere with each other's logic. This actually enhances isolation between different environments or services. If the prefix is an empty string, it defaults to the original "asynq" prefix, maintaining backward compatibility.

KeyPrefix string

// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
Expand Down Expand Up @@ -389,6 +404,9 @@ type RedisFailoverClientOpt struct {
}

func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
// Apply key prefix if specified (preserves existing behavior if not specified)
base.ApplyKeyPrefix(opt.KeyPrefix)

return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: opt.MasterName,
SentinelAddrs: opt.SentinelAddrs,
Expand Down Expand Up @@ -447,9 +465,18 @@ type RedisClusterClientOpt struct {
// TLS Config used to connect to a server.
// TLS will be negotiated only if this field is set.
TLSConfig *tls.Config

// KeyPrefix is the Redis key prefix for all Asynq operations.
// If empty, the default prefix "asynq" will be used.
// This allows you to namespace Asynq keys for different environments
// (e.g., "asynq-prod", "asynq-dev", "asynq-staging").
KeyPrefix string
}

func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// Apply key prefix if specified (preserves existing behavior if not specified)
base.ApplyKeyPrefix(opt.KeyPrefix)

return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: opt.Addrs,
MaxRedirects: opt.MaxRedirects,
Expand Down
69 changes: 57 additions & 12 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,55 @@ const Version = "0.25.1"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"

// DefaultKeyPrefix is the default Redis key prefix used if none is specified.
const DefaultKeyPrefix = "asynq"

// Global variable to store the current key prefix
var (
keyPrefix = DefaultKeyPrefix
keyMutex = sync.RWMutex{}
)

// SetKeyPrefix sets the global Redis key prefix for all Asynq operations.
func SetKeyPrefix(prefix string) {
keyMutex.Lock()
defer keyMutex.Unlock()
if prefix == "" {
keyPrefix = DefaultKeyPrefix
} else {
keyPrefix = prefix
}
}

// GetKeyPrefix returns the current Redis key prefix.
func GetKeyPrefix() string {
keyMutex.RLock()
defer keyMutex.RUnlock()
return keyPrefix
}

// DefaultQueue is the redis key for the default queue.
var DefaultQueue = PendingKey(DefaultQueueName)

// Global Redis keys.
const (
AllServers = "asynq:servers" // ZSET
AllWorkers = "asynq:workers" // ZSET
AllSchedulers = "asynq:schedulers" // ZSET
AllQueues = "asynq:queues" // SET
CancelChannel = "asynq:cancel" // PubSub channel
// getGlobalKey returns a global Redis key with the current prefix.
func getGlobalKey(suffix string) string {
return GetKeyPrefix() + ":" + suffix
}

// Global Redis keys functions that return dynamic keys based on current prefix.
func AllServersKey() string { return getGlobalKey("servers") } // ZSET
func AllWorkersKey() string { return getGlobalKey("workers") } // ZSET
func AllSchedulersKey() string { return getGlobalKey("schedulers") } // ZSET
func AllQueuesKey() string { return getGlobalKey("queues") } // SET
func CancelChannelKey() string { return getGlobalKey("cancel") } // PubSub channel

// Legacy global key variables for backward compatibility - these will be deprecated
var (
AllServers = AllServersKey()
AllWorkers = AllWorkersKey()
AllSchedulers = AllSchedulersKey()
AllQueues = AllQueuesKey()
CancelChannel = CancelChannelKey()
)

// TaskState denotes the state of a task.
Expand Down Expand Up @@ -104,7 +143,7 @@ func ValidateQueueName(qname string) error {

// QueueKeyPrefix returns a prefix for all keys in the given queue.
func QueueKeyPrefix(qname string) string {
return "asynq:{" + qname + "}:"
return GetKeyPrefix() + ":{" + qname + "}:"
}

// TaskKeyPrefix returns a prefix for task key.
Expand Down Expand Up @@ -178,22 +217,22 @@ func FailedKey(qname string, t time.Time) string {

// ServerInfoKey returns a redis key for process info.
func ServerInfoKey(hostname string, pid int, serverID string) string {
return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, serverID)
return fmt.Sprintf("%s:servers:{%s:%d:%s}", GetKeyPrefix(), hostname, pid, serverID)
}

// WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
func WorkersKey(hostname string, pid int, serverID string) string {
return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, serverID)
return fmt.Sprintf("%s:workers:{%s:%d:%s}", GetKeyPrefix(), hostname, pid, serverID)
}

// SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
func SchedulerEntriesKey(schedulerID string) string {
return "asynq:schedulers:{" + schedulerID + "}"
return GetKeyPrefix() + ":schedulers:{" + schedulerID + "}"
}

// SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
func SchedulerHistoryKey(entryID string) string {
return "asynq:scheduler_history:" + entryID
return GetKeyPrefix() + ":scheduler_history:" + entryID
}

// UniqueKey returns a redis key with the given type, payload, and queue name.
Expand Down Expand Up @@ -728,3 +767,9 @@ type Broker interface {

WriteResult(qname, id string, data []byte) (n int, err error)
}

func ApplyKeyPrefix(prefix string) {
if prefix != "" {
SetKeyPrefix(prefix)
}
}
Loading