Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,6 @@ jobs:
runs-on: ${{ matrix.os }}
env:
SKIP_SPOTLESS_CHECK: true
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.19.0
env:
discovery.type: single-node
xpack.security.enabled: false
ES_JAVA_OPTS: "-Xms512m -Xmx512m"
ports:
- 9200:9200
options: >-
--health-cmd "curl -f http://localhost:9200/_cluster/health || exit 1"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 30s
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -273,9 +258,26 @@ jobs:
run: bash tools/build.sh
- name: Install ollama
run: bash tools/start_ollama_server.sh
- name: Start Elasticsearch
run: |
docker compose -f tools/docker/elasticsearch/docker-compose.yml down -v
docker compose -f tools/docker/elasticsearch/docker-compose.yml up -d
timeout 180 bash -c 'until curl -fsS http://localhost:9200/_cluster/health; do sleep 5; done'
- name: Start Milvus
run: |
docker compose -f tools/docker/milvus/docker-compose.yml down -v
docker compose -f tools/docker/milvus/docker-compose.yml up -d
timeout 180 bash -c 'until curl -fsS http://localhost:9091/healthz; do sleep 5; done'
- name: Run e2e tests
env:
LOG_LEVEL: INFO
run: |
export ES_HOST="http://localhost:9200"
tools/e2e.sh
export MILVUS_URI="http://localhost:19530"
tools/e2e.sh
- name: Stop Milvus
if: always()
run: docker compose -f tools/docker/milvus/docker-compose.yml down -v
- name: Stop Elasticsearch
if: always()
run: docker compose -f tools/docker/elasticsearch/docker-compose.yml down -v
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public static final class VectorStore {
public static final String ELASTICSEARCH_VECTOR_STORE =
"org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore";

// Milvus
public static final String MILVUS_VECTOR_STORE =
"org.apache.flink.agents.integrations.vectorstores.milvus.MilvusVectorStore";

// Python Wrapper
public static final String PYTHON_WRAPPER_VECTOR_STORE =
"org.apache.flink.agents.api.vectorstores.python.PythonVectorStore";
Expand Down
7 changes: 6 additions & 1 deletion dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ under the License.
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-vector-stores-milvus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-vector-stores-opensearch</artifactId>
Expand Down Expand Up @@ -156,4 +161,4 @@ under the License.
</plugin>
</plugins>
</build>
</project>
</project>
83 changes: 80 additions & 3 deletions docs/content/docs/development/vector_stores.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ For vector stores that implement `CollectionManageableVectorStore`, you can crea
* `delete_collection` / `deleteCollection`: Delete a collection by name.

{{< hint info >}}
Collection-level operations are only supported for vector stores that implement `CollectionManageableVectorStore`. Among the built-in providers, Chroma (Python), Elasticsearch (Java) and OpenSearch (Java) implement this interface.
Collection-level operations are only supported for vector stores that implement `CollectionManageableVectorStore`. Among the built-in providers, Chroma (Python), Elasticsearch (Java), OpenSearch (Java), and Milvus (Java) implement this interface.
{{< /hint >}}

{{< tabs "Collection level operations" >}}
Expand Down Expand Up @@ -642,9 +642,86 @@ public static ResourceDescriptor vectorStore() {

{{< /tabs >}}

### Milvus

[Milvus](https://milvus.io/) is an open-source vector database designed for high-dimensional vector search at scale.

{{< hint info >}}
Milvus is currently supported in the Java API only. To use Milvus from Python agents, see [Using Cross-Language Providers](#using-cross-language-providers).
{{< /hint >}}

#### Prerequisites

1. A Milvus server.

#### MilvusVectorStore Parameters

| Parameter | Type | Default | Description |
|-----------------------------|------|--------------------------------------|-----------------------------------------------------------------------------|
| `embedding_model` | str | Required | Reference to embedding model resource name |
| `collection` | str | `"flink_agents_milvus_collection"` | Default target Milvus collection name |
| `collection_name` | str | None | Alias for `collection` |
| `index` | str | None | Alias for `collection`, mainly for cross-provider compatibility |
| `id_field` | str | `"id"` | Name of the primary key field |
| `content_field` | str | `"content"` | Name of the field storing document content |
| `metadata_field` | str | `"metadata"` | Name of the JSON field storing document metadata |
| `vector_field` | str | `"embedding"` | Name of the FloatVector field used for vector search |
| `dims` | int | `768` | Vector dimensionality |
| `id_max_length` | int | `65535` | Maximum length for the VarChar primary key field |
| `content_max_length` | int | `65535` | Maximum length for the VarChar content field |
| `metric_type` | str | `"COSINE"` | Milvus metric type used by vector search |
| `index_type` | str | `"AUTOINDEX"` | Milvus vector index type |
| `index_params` | map | `{}` | Extra vector index parameters passed to Milvus |
| `metadata_index_keys` | list | `user_id`, `agent_id`, `run_id`, `actor_id`, `category` | Additional metadata JSON keys indexed with path indexes |
| `metadata_index_cast_types` | map | Default keys use `"VARCHAR"` | Per-metadata-key JSON path index cast type overrides |
| `num_shards` | int | `1` | Number of Milvus shards for newly created collections |
| `consistency_level` | str | `"BOUNDED"` | Milvus consistency level for collection creation, query, and search |
| `max_get_limit` | int | `10000` | Maximum number of documents returned by `get` when no limit is specified |
| `load_timeout_ms` | long | `120000` | Timeout for loading collections |
| `uri` | str | `"http://localhost:19530"` | Milvus endpoint |
| `host` | str | `"localhost"` | Milvus host used when `uri` is not set |
| `port` | int | `19530` | Milvus port used when `uri` is not set |
| `db_name` | str | None | Milvus database name |
| `token` | str | None | Token for Milvus authentication |
| `username` | str | None | Username for basic authentication |
| `password` | str | None | Password for basic authentication |
| `enable_precheck` | bool | `false` | Whether to enable Milvus client precheck |

{{< hint info >}}
When creating a collection, MilvusVectorStore creates a primary-key field, content field, JSON metadata field, vector field, vector index, and JSON metadata indexes. The default metadata JSON path indexes cover common filter keys such as `user_id`, `agent_id`, `run_id`, `actor_id`, and `category`; add `metadata_index_keys` for application-specific filter keys.

The default shard count is `1`. As a rough capacity-planning rule, use about one shard per 100 million vectors, and increase it for heavier write throughput.
{{< /hint >}}

#### Usage Example

{{< tabs "Milvus Usage Example" >}}

{{< tab "Java" >}}

```java
@VectorStore
public static ResourceDescriptor vectorStore() {
return ResourceDescriptor.Builder.newBuilder(ResourceName.VectorStore.MILVUS_VECTOR_STORE)
.addInitialArgument("embedding_model", "embeddingModel")
.addInitialArgument("uri", "http://localhost:19530")
.addInitialArgument("collection", "my_documents")
.addInitialArgument("dims", 1536)
.addInitialArgument("metric_type", "COSINE")
.addInitialArgument("index_type", "AUTOINDEX")
// Optional metadata JSON path indexes
// .addInitialArgument("metadata_index_keys", List.of("user_id", "agent_id", "run_id"))
.build();
}
```

{{< /tab >}}

{{< /tabs >}}

## Using Cross-Language Providers

Flink Agents supports cross-language vector store integration, allowing you to use vector stores implemented in one language (Java or Python) from agents written in the other language. This is particularly useful when a vector store provider is only available in one language (e.g., Elasticsearch is currently Java-only, Chroma is currently Python-only).
Flink Agents supports cross-language vector store integration, allowing you to use vector stores implemented in one language (Java or Python) from agents written in the other language. This is particularly useful when a vector store provider is only available in one language (e.g., Elasticsearch and Milvus are currently Java-only, Chroma is currently Python-only).

{{< hint warning >}}
**Limitations:**
Expand Down Expand Up @@ -1101,4 +1178,4 @@ public class MyVectorStore extends BaseVectorStore

{{< /tab >}}

{{< /tabs >}}
{{< /tabs >}}
3 changes: 2 additions & 1 deletion docs/content/docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Flink Agents provides built-in integrations for many ecosystem providers. Some i
|---|---|---|
| [Chroma]({{< ref "docs/development/vector_stores#chroma" >}}) | ✅ | ❌ |
| [Elasticsearch]({{< ref "docs/development/vector_stores#elasticsearch" >}}) | ❌ | ✅ |
| [Milvus]({{< ref "docs/development/vector_stores#milvus" >}}) | ❌ | ✅ |

**MCP Server**

Expand All @@ -131,4 +132,4 @@ Flink Agents provides built-in integrations for many ecosystem providers. Some i
To avoid potential conflict with Flink cluster, the scope of the dependencies related to Flink and Flink Agents for agent job are provided. See [Maven Dependencies]({{< ref "docs/get-started/installation#maven-dependencies-for-java" >}}) for details.

To run the examples in IDE, users must enable the IDE feature: `add dependencies with provided scope to classpath`.
* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add dependencies with provided scope to classpath`**. See [Run/Debug Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html) for details.
* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add dependencies with provided scope to classpath`**. See [Run/Debug Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html) for details.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Required by Mem0LongTermMemoryTest: OpenAI-compatible chat model + ES vector store. -->
<!-- Required by Mem0LongTermMemoryTest: OpenAI-compatible chat model + vector stores. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-chat-models-openai</artifactId>
Expand All @@ -46,6 +46,11 @@
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-vector-stores-milvus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand All @@ -67,4 +72,4 @@
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,20 @@
* full retrieved item set.
*
* <p>All resources are declared as native Java implementations (Ollama chat / embedding,
* Elasticsearch vector store). Python's mem0 adapter consumes them through the cross-language
* bridge: {@code ctx.get_resource(name, type)} on the Python side returns a Java*Impl wrapper that
* delegates back into Java via Pemja.
* Elasticsearch or Milvus vector store). Python's mem0 adapter consumes them through the
* cross-language bridge: {@code ctx.get_resource(name, type)} on the Python side returns a
* Java*Impl wrapper that delegates back into Java via Pemja.
*
* <p>The test driving this agent must (1) pull the Ollama models and (2) provide ES connection env
* vars ({@code ES_HOST}, {@code ES_INDEX}, {@code ES_DIMS}, {@code ES_VECTOR_FIELD}, optional
* {@code ES_USERNAME}/{@code ES_PASSWORD}); see {@link Mem0LongTermMemoryTest}.
* <p>The test driving this agent must (1) pull the Ollama models and (2) provide vector-store
* connection env vars; see {@link Mem0LongTermMemoryTest}.
*/
public class Mem0LongTermMemoryAgent extends Agent {

public static final String CHAT_MODEL = "qwen3.6-plus";
public static final String OLLAMA_EMBEDDING_MODEL = "nomic-embed-text";
public static final String MEMORY_SET_NAME = "test_ltm";
public static final String ES_LTM_STORE = "esLtmStore";
public static final String MILVUS_LTM_STORE = "milvusLtmStore";

/** Mirrors the Python e2e: dashscope-hosted OpenAI-compatible endpoint, env-overridable. */
private static final String DEFAULT_BASE_URL = "https://coding.dashscope.aliyuncs.com/v1";
Expand Down Expand Up @@ -139,7 +140,9 @@ public static ResourceDescriptor esLtmStore() {
ResourceDescriptor.Builder.newBuilder(
ResourceName.VectorStore.ELASTICSEARCH_VECTOR_STORE)
.addInitialArgument("embedding_model", "ollamaNomicEmbedText")
.addInitialArgument("host", System.getenv("ES_HOST"))
.addInitialArgument(
"host",
System.getenv().getOrDefault("ES_HOST", "http://localhost:9200"))
.addInitialArgument(
"collection",
UUID.randomUUID().toString().substring(0, 8) + "-context");
Expand All @@ -152,6 +155,23 @@ public static ResourceDescriptor esLtmStore() {
return builder.build();
}

@VectorStore
public static ResourceDescriptor milvusLtmStore() {
return ResourceDescriptor.Builder.newBuilder(ResourceName.VectorStore.MILVUS_VECTOR_STORE)
.addInitialArgument("embedding_model", "ollamaNomicEmbedText")
.addInitialArgument(
"uri", System.getenv().getOrDefault("MILVUS_URI", "http://localhost:19530"))
.addInitialArgument(
"collection",
"flink_agents_mem0_" + UUID.randomUUID().toString().replace("-", ""))
.addInitialArgument("dims", 768)
// Test-only: Mem0 e2e reads immediately after writes. Production should use the
// default BOUNDED consistency unless immediate read-after-write visibility is
// required.
.addInitialArgument("consistency_level", "STRONG")
.build();
}

@Action(listenEventTypes = {InputEvent.EVENT_TYPE})
public static void addItems(Event event, RunnerContext ctx) throws Exception {
InputEvent inputEvent = InputEvent.fromEvent(event);
Expand Down
Loading
Loading