Skip to content

KV-events abstraction#356

Open
NaomiEisen wants to merge 2 commits intollm-d:mainfrom
NaomiEisen:kvevents-abstraction
Open

KV-events abstraction#356
NaomiEisen wants to merge 2 commits intollm-d:mainfrom
NaomiEisen:kvevents-abstraction

Conversation

@NaomiEisen
Copy link

@NaomiEisen NaomiEisen commented Feb 25, 2026

Overview

This PR introduces abstraction layers for KV-cache events. The refactoring separates transport protocols, serialization, and engine-specific event structure into distinct layers.

See design docs for full review.

Key Changes

New Abstraction Layers

  • Transport Layer (pkg/kvevents/transport/): Abstracts communication protocols.
  • Decoder Layer (pkg/kvevents/decoder/): Abstracts serialization formats.
  • Engine Adapter Layer (pkg/kvevents/engineadapter/): Converts engine specific events to generic events.

Event Processing Refactor

  • Moved event processing logic into event structures: Each event type (BlockStoredEvent, BlockRemovedEvent, AllBlocksClearedEvent) now implements its own Process() method.
  • Removed double marshal/unmarshal: Events are decoded once by the adapter and passed as structured data to the pool.
  • Added ExtraKeys field to support vLLM's new event format (currently unused).

Testing

Tested on:

  • Unit tests: pkg/kvevents/engineadapter/vllm_adapter_test.go
  • tests/integration/kv_events_test.go
  • pkg/kvevents/subscriber_manager_test.go
  • Performance tests: Using this guide and comparing results against llm-d-inference-scheduler:v0.5.0.

@github-actions
Copy link

Unsigned commits detected! Please sign your commits.

For instructions on how to set up GPG/SSH signing and verify your commits, please see GitHub Documentation.

Medium: &medium,
LoraName: nil,

// Create event in vLLM msgpack array format: [tag, hashes, parent, tokens, blockSize, loraID, medium, loraName]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, test events were created using specific event structures and then converted to a tagged union format via ToTaggedUnion(). This tagged union matched the exact format vllm sends to llm-d. The tagged union structure was necessary because of double marshaling: first to extracted the event type tag, and the second for the actual event data. I avoided it so I completely removed the ToTaggedUnion().

kv_events_config=kv_events_config,
block_size=16,
prefix_caching_hash_algo="sha256_cbor",
prefix_caching_hash_algo="sha256_cbor_64bit",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had this error when running the test:
INFO 02-24 02:10:17 [__init__.py:235] Automatically detected platform cuda. usage: vllm serve [model_tag] [options] vllm serve: error: argument --prefix-caching-hash-algo: invalid choice: 'sha256_cbor' (choose from builtin, sha256, sha256_cbor_64bit)

// getHashAsUint64 converts vLLM hash formats (uint64 or []byte) to uint64.
// This handles both legacy uint64 hashes and new []byte hashes by taking
// the last 8 bytes and interpreting them as a big-endian integer.
func (v *VLLMAdapter) getHashAsUint64(raw any) (uint64, error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be a general/utility function rather than 'vllm-specific'

// parseVLLMTopic extracts pod ID and model name from vLLM topic format.
// Expected format: "pod_id@model_name"
// TODO: Find a way to avoid it
func parseVLLMTopic(topic string) (podID, modelName string) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the same logic as before

return &events.AllBlocksClearedEvent{}, nil
}

// TODO: not sure if it best to keep or remove these
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether it's better to abstract the inner structures from the subscriber (so it only uses the adapter) or to make it use those methods directly from the transport

}

// Check if pod matches our label selector
if !r.Config.PodLabelSelector.Matches(labels.Set(pod.Labels)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to introduce an inference engine as one of the pods identifiers

{{- if .Values.kvCacheManager.enabled }}
--kv-events-config "{\"enable_kv_cache_events\":{{ .Values.kvCacheManager.enabled }},\"publisher\":\"zmq\",\"endpoint\":\"{{ include "chart.kvCacheManagerServiceUrl" . }}\",\"topic\":\"kv@${POD_IP}@{{ .Values.vllm.model.name }}\"}" \
--prefix-caching-hash-algo sha256_cbor \
--prefix-caching-hash-algo sha256_cbor_64bit \
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had this error:
INFO 02-24 02:10:17 [__init__.py:235] Automatically detected platform cuda. usage: vllm serve [model_tag] [options] vllm serve: error: argument --prefix-caching-hash-algo: invalid choice: 'sha256_cbor' (choose from builtin, sha256, sha256_cbor_64bit)

@@ -0,0 +1,145 @@
// Copyright 2025 The llm-d Authors.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is very similar to the previous zmq_subscriber.go. I'm not sure why it's not just showing as 'renamed' + the changed lines. If it's difficult to compare, I can try to fix it

@NaomiEisen NaomiEisen marked this pull request as draft February 25, 2026 00:50
…stractions for multi-engine support. Modify Pool and Subscribers to use new layers.
@NaomiEisen NaomiEisen force-pushed the kvevents-abstraction branch from 6022335 to 6a6b5b9 Compare March 2, 2026 11:32
@NaomiEisen NaomiEisen marked this pull request as ready for review March 2, 2026 11:38
Copy link
Collaborator

@sagearc sagearc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @NaomiEisen, awesome work here! This is a big one though, 4 new packages plus the core refactor is a lot to review together. Can you split it into multiple, more focused PRs? Thanks!

@NaomiEisen
Copy link
Author

Hey @NaomiEisen, awesome work here! This is a big one though, 4 new packages plus the core refactor is a lot to review together. Can you split it into multiple, more focused PRs? Thanks!

Thanks for your response, and apologies for the inconvenience 🙏
I'm not sure how to split this into separate PRs, since the changes and new packages are closely tied together and I developed and tested them as a single unit to preserve the existing logic while introducing the new abstraction.

I'm concerned that splitting it might result in intermediate PRs that are not fully functional or don't make sense from a design perspective. Also, if I understood correctly, we'd like to get these changes ASAP, and I'm afraid that breaking them up might slow down the process (at least from my side).

That said, I'm happy to adjust if you have suggestions :)

Copy link
Member

@vMaroon vMaroon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall great work - the instinct and direction are right.

Before we think about splitting into multiple PRs, I think we should first tighten the abstractions. Seeing the actual code makes it clearer than the design doc did - the interface surface is wider than what the callers actually need. I left some comments, but the common thread is to design each interface from the caller's perspective, and let adapters own their internals privately.

Practically this means to focus on the essence of the work you made: the contracts between the pool, engine adapters and subscribers.

limitations under the License.
*/

package decoder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This package wraps a single msgpack.Unmarshal call, and vllm_adapter.go still calls msgpack.Unmarshal directly in the converters. I think we can drop this package and have serialization remain an internal detail of each adapter.

// EngineAdapter defines the interface for engine-specific adapters.
// Each inference engine has its own adapter implementation that handles
// engine-specific operations.
type EngineAdapter interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transport() and Decode() are not used outside the adapter. I think similarly to the decoder, we can maybe collapse the transport into the adapter until the separation is needed.

DecodeMessageToEventBatch(msg *RawMessage) (*events.EventBatch, error)

// Connect establishes a connection to a remote endpoint.
Connect(ctx context.Context, endpoint string) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine Connect, Bind and SubscribeToTopic into Setup(ctx, endpoint, topicFilter, remote bool) error?

Type() EventType

// Process processes the event and updates the index.
Process(ctx context.Context, index kvblock.Index, tokenProcessor kvblock.TokenProcessor,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having events know about kvblock.Index and TokenProcessor couples data to infra. What if a future engine needs different processing logic? Consider keeping events as pure data and letting the pool own the processing as it previously did.

The processing switch-case in pool.digestEvents was one function that kept the indexing coupling in one place - that's a tighter contract than distributing it across every event type.


// Transport defines the interface for receiving raw bytes from different
// transport protocols (ZMQ, HTTP, gRPC, etc.).
type Transport interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a ZMQ wrapper and is not used by the subscriber - which strengthens the point of collapsing it into the adapter.

// Payload is the raw msgpack-encoded event batch bytes, not yet decoded.
Payload []byte
// Adapter is the engine adapter that can decode this payload.
Adapter EngineAdapter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this was added here to let the pool manage the decoding of the payload? A pool already has a reference to a single adapter type, we can eject this circular dependency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants