-
Notifications
You must be signed in to change notification settings - Fork 96
KV-events abstraction #356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6a6b5b9
e1f069c
a766310
ab34a6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,7 +78,7 @@ def create_llm(): | |
| disable_hybrid_kv_cache_manager=True, | ||
| kv_events_config=kv_events_config, | ||
| block_size=16, | ||
| prefix_caching_hash_algo="sha256_cbor", | ||
| prefix_caching_hash_algo="sha256_cbor_64bit", | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had this error when running the test: |
||
| enable_lora=True, | ||
| max_model_len=4096, | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| /* | ||
| Copyright 2026 The llm-d Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package engineadapter | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/llm-d/llm-d-kv-cache/pkg/kvevents/events" | ||
| ) | ||
|
|
||
| // EngineType represents the type of LLM engine. | ||
| type EngineType string | ||
|
|
||
| const ( | ||
| // EngineTypeVLLM represents the vLLM engine. | ||
| EngineTypeVLLM EngineType = "vllm" | ||
| ) | ||
|
|
||
| // RawMessage holds the pre-parsed framing metadata from a received transport | ||
| // message, with the payload still in raw (not yet decoded) bytes. | ||
| // It is returned by ReceiveMessage and passed to DecodeMessageToEventBatch. | ||
| type RawMessage struct { | ||
| // PodID that is parsed from the topic. | ||
| PodID string | ||
| // Model name that is parsed from the topic. | ||
| ModelName string | ||
| // Sequence is the message sequence number from the transport. | ||
| Sequence uint64 | ||
| // Topic is the original transport topic string. | ||
| Topic string | ||
| // Payload is the raw msgpack-encoded event batch bytes, not yet decoded. | ||
| Payload []byte | ||
| // Adapter is the engine adapter that can decode this payload. | ||
| Adapter EngineAdapter | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume that this was added here to let the pool manage the decoding of the payload? A pool already has a reference to a single adapter type, we can eject this circular dependency. |
||
| } | ||
|
|
||
| // NewAdapter creates a new engine adapter based on the engine type. | ||
| func NewAdapter(engineType EngineType) (EngineAdapter, error) { | ||
| // It looks useless right now but we're preparing for future support of other engines ;) | ||
| switch engineType { | ||
| case EngineTypeVLLM: | ||
| return NewVLLMAdapter() | ||
| default: | ||
| return nil, fmt.Errorf("unknown engine type: %s", engineType) | ||
| } | ||
| } | ||
|
|
||
| // EngineAdapter defines the interface for engine-specific adapters. | ||
| // Each inference engine has its own adapter implementation that handles | ||
| // engine-specific message receiving, decoding, and connection management. | ||
| type EngineAdapter interface { | ||
NaomiEisen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // ReceiveMessage receives a raw message and returns a RawMessage | ||
| // with pre-parsed framing metadata, but with the payload still in raw bytes. | ||
| // This is intentionally cheap — no event payload decoding happens here. | ||
| ReceiveMessage(ctx context.Context) (*RawMessage, error) | ||
|
|
||
| // DecodeMessageToEventBatch decodes the raw payload of a RawMessage into a | ||
| // fully populated EventBatch. | ||
| DecodeMessageToEventBatch(msg *RawMessage) (*events.EventBatch, error) | ||
|
|
||
| // Connect establishes a connection to a remote endpoint. | ||
| Connect(ctx context.Context, endpoint string) error | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we combine |
||
|
|
||
| // Bind listens on a local endpoint for incoming connections. | ||
| Bind(ctx context.Context, endpoint string) error | ||
|
|
||
| // SubscribeToTopic sets the topic filter for receiving messages. | ||
| SubscribeToTopic(topicFilter string) error | ||
|
|
||
| // Close closes the adapter and releases all resources. | ||
| Close() error | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, test events were created using specific event structures and then converted to a tagged union format via ToTaggedUnion(). This tagged union matched the exact format vllm sends to llm-d. The tagged union structure was necessary because of double marshaling: first to extracted the event type tag, and the second for the actual event data. I avoided it so I completely removed the ToTaggedUnion().