Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,21 @@ Priority order:
<details>
<summary><b>Resources Configurations</b></summary>

| Setting | Default | Required | Description |
|------------------------------|:--------:|:--------:|----------------------------------------------------------------------------------------------------|
| resources.maxSize | 67108864 | No | Max allowed size in bytes for a resource. |
| resources.maxSizeToCache | 1048576 | No | Max size in bytes for a resource to cache in Redis. |
| resources.syncPeriod | 60000 | No | Period in milliseconds, how frequently check for resources to sync. |
| resources.syncDelay | 120000 | No | Delay in milliseconds for a resource to be written back in object storage after last modification. |
| resources.syncBatch | 4096 | No | How many resources to sync in one go. |
| resources.cacheExpiration | 300000 | No | Expiration in milliseconds for synced resources in Redis. |
| resources.compressionMinSize | 256 | No | Compress a resource with gzip if its size in bytes more or equal to this value. |
| Setting | Default | Required | Description |
|-----------------------------------------|:--------:|:--------:|----------------------------------------------------------------------------------------------------|
| resources.maxSize | 67108864 | No | Max allowed size in bytes for a resource. |
| resources.maxSizeToCache | 1048576 | No | Max size in bytes for a resource to cache in Redis. |
| resources.syncPeriod | 60000 | No | Period in milliseconds, how frequently check for resources to sync. |
| resources.syncDelay | 120000 | No | Delay in milliseconds for a resource to be written back in object storage after last modification. |
| resources.syncBatch | 4096 | No | How many resources to sync in one go. |
| resources.cacheExpiration | 300000 | No | Expiration in milliseconds for synced resources in Redis. |
| resources.compressionMinSize | 256 | No | Compress a resource with gzip if its size in bytes more or equal to this value. |
| resources.resourceTypesExpiration | | No | Define expiration time per resource type in milliseconds |
| resources.resourceTypesExpiration.FILE | | 300000 | Define expiration time for files |
| resources.resourceTypesExpiration.CONVERSATION | | 300000 | Define expiration time for converations |
| resources.resourceTypesExpiration.PROMPT | | 300000 | Define expiration time for prompts |
| resources.resourceTypesExpiration.APPLICATION | | Infinity | Define expiration time for applications |
| resources.resourceTypesExpiration.TOOL_SET | | Infinity | Define expiration time for toolsets |

</details>

Expand Down
14 changes: 13 additions & 1 deletion server/src/main/java/com/epam/aidial/core/server/AiDial.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.epam.aidial.core.storage.blobstore.BlobStorage;
import com.epam.aidial.core.storage.blobstore.Storage;
import com.epam.aidial.core.storage.cache.CacheClientFactory;
import com.epam.aidial.core.storage.resource.ResourceTypes;
import com.epam.aidial.core.storage.service.LockService;
import com.epam.aidial.core.storage.service.ResourceService;
import com.epam.aidial.core.storage.service.TimerService;
Expand Down Expand Up @@ -182,7 +183,7 @@ void start() throws Exception {

LockService lockService = new LockService(redis, storage.getPrefix());
TimerService timerService = new VertxTimerService(vertx, taskExecutor);
ResourceService.Settings resourceServiceSettings = Json.decodeValue(settings("resources").toBuffer(), ResourceService.Settings.class);
ResourceService.Settings resourceServiceSettings = getResourceSettings();
resourceService = new ResourceService(timerService, redis, storage, lockService, resourceServiceSettings, storage.getPrefix());
InvitationService invitationService = new InvitationService(resourceService, encryptionService, settings("invitations"));
ApiKeyStore apiKeyStore = new ApiKeyStore(taskExecutor, redis, storage.getPrefix(), settings("perRequestApiKey"));
Expand Down Expand Up @@ -260,6 +261,17 @@ void start() throws Exception {
}
}

private ResourceService.Settings getResourceSettings() {
ResourceService.Settings resourceServiceSettings = Json.decodeValue(settings("resources").toBuffer(), ResourceService.Settings.class);
Map<String, Long> resourceTypeExpiration = resourceServiceSettings.getResourceTypesExpiration();
for (ResourceTypes resourceType : ResourceTypes.values()) {
if (!resourceTypeExpiration.containsKey(resourceType.name())) {
resourceTypeExpiration.put(resourceType.name(), resourceType.ttl());
}
}
return resourceServiceSettings;
}

private static HttpProxySelector createHttpProxySelector(HttpClientOptions options) {
ProxyOptions proxyOptions = options.getProxyOptions();
if (proxyOptions == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ public interface ResourceType {
String group();

boolean requireCompression();

/**
* @return TTL in milliseconds.
*/
long ttl();
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
package com.epam.aidial.core.storage.resource;

import java.util.concurrent.TimeUnit;

public enum ResourceTypes implements ResourceType {
FILE("files", false), CONVERSATION("conversations", true),
PROMPT("prompts", true), LIMIT("limits", true),
SHARED_WITH_ME("shared_with_me", true), SHARED_BY_ME("shared_by_me", true), INVITATION("invitations", true),
PUBLICATION("publications", true), RULES("rules", true), API_KEY_DATA("api_key_data", true), NOTIFICATION("notifications", true),
APPLICATION("applications", true), DEPLOYMENT_COST_STATS("deployment_cost_stats", true),
CODE_INTERPRETER_SESSION("code_interpreter_session", true), USER_CONSENT("user_consent", true),
TOOL_SET("toolsets", true),
CREDENTIALS("credentials", true),
ENCRYPTION_KEYS("encryption_keys", true);

FILE("files", false, TimeUnit.MINUTES.toMillis(5)),
CONVERSATION("conversations", true, TimeUnit.MINUTES.toMillis(5)),
PROMPT("prompts", true, TimeUnit.MINUTES.toMillis(5)),
LIMIT("limits", true, TimeUnit.MINUTES.toMillis(5)),
SHARED_WITH_ME("shared_with_me", true, TimeUnit.MINUTES.toMillis(5)),
SHARED_BY_ME("shared_by_me", true, TimeUnit.MINUTES.toMillis(5)),
INVITATION("invitations", true, TimeUnit.MINUTES.toMillis(5)),
PUBLICATION("publications", true, TimeUnit.MINUTES.toMillis(5)),
RULES("rules", true, TimeUnit.MINUTES.toMillis(5)),
API_KEY_DATA("api_key_data", true, TimeUnit.MINUTES.toMillis(5)),
NOTIFICATION("notifications", true, TimeUnit.MINUTES.toMillis(5)),
APPLICATION("applications", true, Long.MAX_VALUE),
DEPLOYMENT_COST_STATS("deployment_cost_stats", true, TimeUnit.MINUTES.toMillis(5)),
CODE_INTERPRETER_SESSION("code_interpreter_session", true, TimeUnit.MINUTES.toMillis(5)),
USER_CONSENT("user_consent", true, TimeUnit.MINUTES.toMillis(5)),
TOOL_SET("toolsets", true, Long.MAX_VALUE),
CREDENTIALS("credentials", true, TimeUnit.MINUTES.toMillis(5)),
ENCRYPTION_KEYS("encryption_keys", true, TimeUnit.MINUTES.toMillis(5));

private final String group;
private final boolean requireCompression;
private final long ttl;

ResourceTypes(String group, boolean requireCompression) {
ResourceTypes(String group, boolean requireCompression, long ttl) {
this.group = group;
this.requireCompression = requireCompression;
this.ttl = ttl;
}

public static ResourceTypes of(String group) {
Expand Down Expand Up @@ -45,4 +59,10 @@ public boolean requireCompression() {
return requireCompression;
}

@Override
public long ttl() {
return ttl;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class ResourceService implements AutoCloseable {
private final int compressionMinSize;
private final String prefix;
private final String resourceQueue;
private final Map<String, Long> resourceTypeExpiration;

public ResourceService(TimerService timerService,
RedissonClient redis,
Expand All @@ -120,14 +122,14 @@ public ResourceService(TimerService timerService,
this.lockService = lockService;
this.topic = new ResourceTopic(redis, "resource:" + BlobStorageUtil.toStoragePath(prefix, "topic"));
this.maxSize = settings.maxSize;
this.maxSizeToCache = settings.maxSizeToCache();
this.maxSizeToCache = settings.maxSizeToCache;
this.syncDelay = settings.syncDelay;
this.syncBatch = settings.syncBatch;
this.cacheExpiration = Duration.ofMillis(settings.cacheExpiration);
this.compressionMinSize = settings.compressionMinSize;
this.prefix = prefix;
this.resourceQueue = "resource:" + BlobStorageUtil.toStoragePath(prefix, "queue");

this.resourceTypeExpiration = Objects.requireNonNullElseGet(settings.resourceTypesExpiration, Map::of);
this.syncTimer = timerService.scheduleWithFixedDelay(settings.syncPeriod, settings.syncPeriod, this::sync);
}

Expand Down Expand Up @@ -705,7 +707,8 @@ private RMap<String, byte[]> sync(String redisKey) {
long ttl = map.remainTimeToLive();
// according to the documentation, -1 means expiration is not set
if (ttl == -1) {
map.expire(cacheExpiration);
Duration expiration = getExpiration(redisKey);
map.expire(expiration);
}
redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE).remove(redisKey);
return map;
Expand Down Expand Up @@ -870,15 +873,17 @@ private void redisPut(String key, Result result) {
map.putAll(fields);

if (result.synced) { // cleanup because it is already synced
map.expire(cacheExpiration);
Duration expiration = getExpiration(key);
map.expire(expiration);
set.remove(key);
}
}

private RMap<String, byte[]> redisSync(String key) {
RMap<String, byte[]> map = redis.getMap(key, REDIS_MAP_CODEC);
map.put(SYNCED_ATTRIBUTE, RedisUtil.BOOLEAN_TRUE_ARRAY);
map.expire(cacheExpiration);
Duration expiration = getExpiration(key);
map.expire(expiration);

RScoredSortedSet<String> set = redis.getScoredSortedSet(resourceQueue, StringCodec.INSTANCE);
set.remove(key);
Expand Down Expand Up @@ -952,6 +957,15 @@ private static String extractEtag(BlobMetadata meta) {
return (etag == null) ? meta.getETag() : EtagHeader.quoteIfNeeded(etag);
}

private Duration getExpiration(String redisKey) {
String resourceType = RedisUtil.getResourceType(redisKey);
Long ttl = resourceTypeExpiration.get(resourceType);
if (ttl == null) {
return cacheExpiration;
}
return Duration.ofMillis(ttl);
}

@Builder
private record Result(
byte[] body,
Expand Down Expand Up @@ -1003,23 +1017,53 @@ private static ResourceStream fromResult(Result item, EtagHeader etagHeader) {
}
}

/**
* @param maxSize - max allowed size in bytes for a resource.
* @param maxSizeToCache - max size in bytes to cache resource in Redis.
* @param syncPeriod - period in milliseconds, how frequently check for resources to sync.
* @param syncDelay - delay in milliseconds for a resource to be written back in object storage after last modification.
* @param syncBatch - how many resources to sync in one go.
* @param cacheExpiration - expiration in milliseconds for synced resources in Redis.
* @param compressionMinSize - compress resources with gzip if their size in bytes more or equal to this value.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public record Settings(
int maxSize,
int maxSizeToCache,
long syncPeriod,
long syncDelay,
int syncBatch,
long cacheExpiration,
int compressionMinSize) {
@Data
public static class Settings {
/**
* Max allowed size in bytes for a resource.
*/
private int maxSize;
/**
* Max size in bytes to cache resource in Redis.
*/
private int maxSizeToCache;
/**
* Period in milliseconds, how frequently check for resources to sync.
*/
private int syncPeriod;
/**
* Delay in milliseconds for a resource to be written back in object storage after last modification.
*/
private long syncDelay;
/**
* How many resources to sync in one go.
*/
private int syncBatch;
/**
* Default expiration in milliseconds for synced resources in Redis.
*/
private long cacheExpiration;
/**
* Compress resources with gzip if their size in bytes more or equal to this value.
*/
private int compressionMinSize;
/**
* Expiration in milliseconds per resource type.
*/
private Map<String, Long> resourceTypesExpiration = new HashMap<>();

public Settings() {
}

public Settings(int maxSize, int maxSizeToCache, int syncPeriod, long syncDelay, int syncBatch, long cacheExpiration, int compressionMinSize) {
this.maxSize = maxSize;
this.maxSizeToCache = maxSizeToCache;
this.syncPeriod = syncPeriod;
this.syncDelay = syncDelay;
this.syncBatch = syncBatch;
this.cacheExpiration = cacheExpiration;
this.compressionMinSize = compressionMinSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ public String redisKey(ResourceDescriptor descriptor, String prefix) {
String resourcePath = BlobStorageUtil.toStoragePath(prefix, descriptor.getAbsoluteFilePath());
return descriptor.getType().name().toLowerCase() + ":" + resourcePath;
}

public String getResourceType(String redisKey) {
int index = redisKey.indexOf(':');
if (index == -1) {
throw new IllegalArgumentException("Invalid redis key");
}
return redisKey.substring(0, index);
}
}
Loading