|
| 1 | +# Core Concepts: Aggregates & Event Sourcing |
| 2 | + |
| 3 | +Aggregates are a fundamental concept from Domain-Driven Design (DDD) that Streak leverages heavily, especially in the context of Event Sourcing. |
| 4 | + |
| 5 | +## What is an Aggregate? |
| 6 | + |
| 7 | +An Aggregate is a cluster of domain objects (Entities and Value Objects) that can be treated as a single unit. It has a root entity, known as the **Aggregate Root**, which is the only member of the aggregate that external objects are allowed to hold references to. |
| 8 | + |
| 9 | +The primary purpose of an Aggregate is to enforce **consistency boundaries**. Business rules and invariants that span multiple objects within the aggregate are enforced by the Aggregate Root. All changes to the state within the aggregate must go through the Aggregate Root. |
| 10 | + |
| 11 | +In Streak, Aggregates are the primary building blocks for your domain logic when using Event Sourcing. |
| 12 | + |
| 13 | +## Core Interfaces |
| 14 | + |
| 15 | +Streak defines several interfaces to represent these concepts: |
| 16 | + |
| 17 | +* `Streak\Domain\ValueObject`: A marker interface for Value Objects, indicating they should be compared by value (requires an `equals` method, typically inherited). |
| 18 | +* `Streak\Domain\Id`: Represents an identifier, extending `ValueObject`. Requires `toString(): string` and static `fromString(string $id): self` methods. |
| 19 | +* `Streak\Domain\Entity`: Represents an entity within the domain, which has a distinct identity and is compared by that identity. Requires an `id(): Entity\Id` method and an `equals` method (typically inherited). |
| 20 | +* `Streak\Domain\Aggregate`: Represents an aggregate boundary, extending `Entity`. Requires `id(): Aggregate\Id`. |
| 21 | +* `Streak\Domain\AggregateRoot`: Represents the root of an aggregate, extending `Aggregate`. Requires `id(): AggregateRoot\Id`. |
| 22 | + |
| 23 | +The hierarchy (`AggregateRoot` -> `Aggregate` -> `Entity`) requires specific types of IDs at different levels, although often a single ID type (like a UUID) implementing `AggregateRoot\Id` is used. |
| 24 | + |
| 25 | +## Identifiers (IDs) |
| 26 | + |
| 27 | +IDs in Streak are Value Objects. They uniquely identify Entities and Aggregate Roots. The base `Streak\Domain\Id` interface ensures they can be easily converted to and from strings, which is crucial for persistence and referencing. |
| 28 | + |
| 29 | +```php |
| 30 | +<?php |
| 31 | + |
| 32 | +use Streak\Domain; |
| 33 | + |
| 34 | +interface Id extends Domain\ValueObject |
| 35 | +{ |
| 36 | + public function toString(): string; |
| 37 | + public static function fromString(string $id): Domain\Id; // Often returns static or self |
| 38 | +} |
| 39 | +``` |
| 40 | + |
| 41 | +Common implementations often use UUIDs. |
| 42 | + |
| 43 | +## Loading and Saving Aggregates (Repositories) |
| 44 | + |
| 45 | +While Aggregate Roots encapsulate domain logic, **Repositories** handle the persistence concerns: loading aggregates from the `EventStore` and saving new events. |
| 46 | + |
| 47 | +**Loading (Rehydration):** |
| 48 | + |
| 49 | +1. The repository retrieves the event stream for a given aggregate ID from the `EventStore`. |
| 50 | +2. It needs a way to create a new, empty instance of the correct aggregate type. Streak defines the `Streak\Domain\AggregateRoot\Factory` interface for this purpose. |
| 51 | + * Each aggregate type requires a corresponding concrete implementation of this factory interface. |
| 52 | + * The factory's `create(AggregateRoot\Id $id): AggregateRoot` method receives the ID and is responsible for instantiating the aggregate, injecting any necessary dependencies (like clocks, services) that the factory itself holds. |
| 53 | +3. The repository calls the `replay($stream)` method (from the `EventSourcing` trait) on the instance created by the factory, which applies the historical events to restore its state. |
| 54 | + |
| 55 | +**Saving:** |
| 56 | + |
| 57 | +1. After a command is processed, the repository retrieves the uncommitted events from the aggregate instance using `$aggregate->events()` (from the `EventSourcing` trait). |
| 58 | +2. It passes these events to `EventStore->add(...$events)`. |
| 59 | + |
| 60 | +**Generic Repositories:** |
| 61 | + |
| 62 | +The combination of standardized, string-representable IDs (`AggregateRoot\Id` with `fromString`) and the **standardized `AggregateRoot\Factory` interface** enables the creation of highly **generic repositories**. A single repository service can potentially load *any* type of aggregate root if it knows how to: |
| 63 | + |
| 64 | +a) Convert a string ID back into the correct `AggregateRoot\Id` object (`Id::fromString(...)`). |
| 65 | +b) Locate and use the correct `AggregateRoot\Factory` implementation for the given aggregate type (often via a mapping or naming convention within the DI container), calling its `create($id)` method. |
| 66 | + |
| 67 | +This significantly reduces persistence boilerplate compared to systems requiring a separate repository class for every aggregate type. |
| 68 | + |
| 69 | +## Event Sourcing Implementation |
| 70 | + |
| 71 | +In Event Sourcing, the state of an Aggregate Root is not stored directly. Instead, all changes to the aggregate are recorded as a sequence of immutable Domain Events. The current state is derived by replaying these events. |
| 72 | + |
| 73 | +Streak provides traits to simplify implementing event-sourced aggregates: |
| 74 | + |
| 75 | +* `Streak\Domain\AggregateRoot\Identification`: Provides implementation for `id()` and identity comparison based on the ID. |
| 76 | +* `Streak\Domain\AggregateRoot\EventSourcing`: The core trait. It manages the sequence of uncommitted events, provides the `apply(Domain\Event $event)` method to record new events, and handles the mechanism for replaying events to rebuild state. |
| 77 | +* `Streak\Domain\AggregateRoot\Comparison`: Implements the `equals()` method based on Aggregate type and ID. |
| 78 | + |
| 79 | +**The Pattern:** |
| 80 | + |
| 81 | +1. **Define State:** Add private properties to your Aggregate Root class to hold its state. |
| 82 | +2. **Implement `Event\Sourced\AggregateRoot`:** This interface combines `AggregateRoot` with event sourcing specific methods. |
| 83 | +3. **Use Traits:** Include `Identification`, `EventSourcing`, and `Comparison` traits. |
| 84 | +4. **Write Event Appliers:** For each Domain Event that can change the aggregate's state, create a private method named `apply<EventName>(Event $event)`. This method takes the specific event type as an argument and modifies the aggregate's private properties based on the event data. |
| 85 | + ```php |
| 86 | + private function applySomethingHappened(Events\SomethingHappened $event) : void |
| 87 | + { |
| 88 | + $this->someState = $event->getSomeData(); |
| 89 | + $this->updatedAt = $event->occurredAt(); |
| 90 | + } |
| 91 | + ``` |
| 92 | +5. **Handle Commands (see below):** In your command handlers, after validating the command and business rules, call `$this->apply(new Events\SomethingHappened(...));` to record the change. |
| 93 | + |
| 94 | +*(Note: While this documentation shows direct interface implementation and trait usage, you can create your own abstract base classes (e.g., `AbstractEventSourcedAggregate`) within your project that implement the required interfaces and use the necessary Streak traits. Your concrete aggregates can then extend these base classes to reduce boilerplate.)* |
| 95 | + |
| 96 | +When the aggregate is loaded from the Event Store, the `EventSourcing` trait replays its historical events, calling the corresponding `apply<EventName>` methods to reconstruct the current state. |
| 97 | + |
| 98 | +## Handling Commands |
| 99 | + |
| 100 | +Commands represent requests to change the state of an aggregate. |
| 101 | + |
| 102 | +* **`Streak\Domain\CommandHandler`:** An interface indicating a class can handle commands. |
| 103 | +* **`Streak\Domain\Command\Handling`:** A trait that facilitates routing commands to specific handler methods within the class. |
| 104 | + |
| 105 | +**The Pattern:** |
| 106 | + |
| 107 | +1. **Implement `CommandHandler`:** Add the interface to your Aggregate Root (or a dedicated command handler service). |
| 108 | +2. **Use Trait:** Include the `Command\Handling` trait. |
| 109 | +3. **Write Command Handlers:** For each command the aggregate should handle, create a public method named `handle<CommandName>(Command $command)`. This method takes the specific command type as an argument. |
| 110 | +4. **Implement Logic:** Inside the handler method: |
| 111 | + * Validate the command data. |
| 112 | + * Check business rules and invariants based on the aggregate's current state. |
| 113 | + * If validation and rules pass, call `$this->apply(new DomainEvent(...));` to record the resulting change as one or more events. |
| 114 | + * If rules are violated, throw a domain-specific exception. |
| 115 | + |
| 116 | +```php |
| 117 | +<?php |
| 118 | + |
| 119 | +use Streak\Domain; |
| 120 | +use Streak\Domain\Event; |
| 121 | + |
| 122 | +final class MyAggregate implements Event\Sourced\AggregateRoot, Domain\CommandHandler |
| 123 | +{ |
| 124 | + use Domain\AggregateRoot\Identification; |
| 125 | + use Domain\AggregateRoot\EventSourcing; |
| 126 | + use Domain\AggregateRoot\Comparison; |
| 127 | + use Domain\Command\Handling; |
| 128 | + |
| 129 | + private $state; |
| 130 | + |
| 131 | + // Constructor usually takes the ID |
| 132 | + public function __construct(MyAggregate\Id $id) |
| 133 | + { |
| 134 | + $this->identifyBy($id); |
| 135 | + } |
| 136 | + |
| 137 | + // Command Handler |
| 138 | + public function handleDoSomething(Commands\DoSomething $command) : void |
| 139 | + { |
| 140 | + if ($this->state === 'invalid_state_for_command') { |
| 141 | + throw new Exception\CannotDoSomethingInThisState(); |
| 142 | + } |
| 143 | + |
| 144 | + // Validation passed, apply the event |
| 145 | + $this->apply(new Events\SomethingHappened($this->id(), $command->data(), \DateTimeImmutable::create())); |
| 146 | + } |
| 147 | + |
| 148 | + // Event Applier |
| 149 | + private function applySomethingHappened(Events\SomethingHappened $event) : void |
| 150 | + { |
| 151 | + $this->state = 'updated_state'; |
| 152 | + // ... update other properties |
| 153 | + } |
| 154 | +} |
| 155 | +``` |
| 156 | + |
| 157 | +This structure keeps the command processing logic and the state mutation logic separate but colocated within the Aggregate Root, ensuring it remains the guardian of its own consistency. |
| 158 | + |
| 159 | +### Event-Sourced Entities within Aggregates |
| 160 | + |
| 161 | +A powerful feature of Streak is that entities *within* an Aggregate Root can also be event-sourced. This allows complex sub-components of an aggregate to manage their own state changes through events, while still maintaining the overall consistency boundary of the Aggregate Root. |
| 162 | + |
| 163 | +**Pattern:** |
| 164 | + |
| 165 | +1. **Implement `Event\Sourced\Entity`:** The entity class implements this interface. |
| 166 | +2. **Use Traits:** Include `Entity\Identification`, `Entity\EventSourcing`, and `Entity\Comparison` traits. |
| 167 | +3. **Define State & Appliers:** The entity has its own private state properties and `apply<EventName>` methods to mutate that state based on events *it* generates. |
| 168 | +4. **Apply Events:** Methods within the entity can call `$this->apply(new EntitySpecificEvent(...));` to record changes specific to that entity. |
| 169 | +5. **Register with Root:** Crucially, the entity needs to be linked to its Aggregate Root. This is typically done in the entity's constructor by calling `$this->registerAggregateRoot($aggregateRootInstance);` (method provided by the `Entity\EventSourcing` trait). |
| 170 | + |
| 171 | +*(Note: Similar to Aggregate Roots, you can create abstract base classes for your event-sourced entities within your application to encapsulate the interface implementation and trait usage.)* |
| 172 | + |
| 173 | +**How it Works:** |
| 174 | + |
| 175 | +When an entity calls `$this->apply()`, the `Entity\EventSourcing` trait doesn't just store the event locally; it also registers the event with the Aggregate Root it was linked to via `registerAggregateRoot()`. When the Aggregate Root is saved (e.g., via `$repository->save($aggregateRoot)`), the root's `EventSourcing` trait collects not only the events applied directly to the root but also all events applied to its registered event-sourced entities. |
| 176 | + |
| 177 | +This ensures that all changes within the aggregate boundary (whether originating from the root or a nested entity) are persisted atomically as a single sequence of events for that aggregate instance. |
| 178 | + |
| 179 | +**Example:** |
| 180 | + |
| 181 | +Imagine a `Project` aggregate containing multiple `Task` entities. A `Task` could be event-sourced: |
| 182 | + |
| 183 | +```php |
| 184 | +// Inside Task.php (implements Event\Sourced\Entity) |
| 185 | +use Streak\Domain\Entity; |
| 186 | + |
| 187 | +// ... traits ... |
| 188 | + |
| 189 | +public function __construct(Project $project, Task\Id $id, ...) { |
| 190 | + $this->identifyBy($id); |
| 191 | + $this->registerAggregateRoot($project); // Link to parent |
| 192 | + // ... |
| 193 | +} |
| 194 | + |
| 195 | +public function completeTask(): void { |
| 196 | + if ($this->isCompleted) { return; } |
| 197 | + $this->apply(new Events\TaskCompleted($this->aggregateRootId(), $this->id(), ...)); |
| 198 | +} |
| 199 | + |
| 200 | +private function applyTaskCompleted(Events\TaskCompleted $event): void { |
| 201 | + $this->isCompleted = true; |
| 202 | +} |
| 203 | +``` |
| 204 | + |
| 205 | +When `$projectRepository->save($project)` is called, the `TaskCompleted` event applied by the `Task` instance will be included in the list of events saved for the `Project`. |
| 206 | + |
| 207 | +This allows for rich, encapsulated behavior within aggregates while maintaining the principles of event sourcing. |
| 208 | + |
| 209 | +## Aggregate Snapshotting (Performance Optimization) |
| 210 | + |
| 211 | +For aggregates with very long event histories, replaying all events every time the aggregate is loaded can become a performance bottleneck. **Snapshotting** is an optimization technique to mitigate this. |
| 212 | + |
| 213 | +A snapshot captures the entire state of an aggregate at a specific version (or point in time). When loading an aggregate that has a snapshot: |
| 214 | + |
| 215 | +1. The repository first loads the most recent snapshot. |
| 216 | +2. It restores the aggregate's state *from the snapshot*. |
| 217 | +3. It then loads and replays only the events that occurred *after* the snapshot was taken. |
| 218 | + |
| 219 | +This significantly reduces the number of events that need to be processed during rehydration. |
| 220 | + |
| 221 | +### The `Snapshottable` Interface |
| 222 | + |
| 223 | +Streak provides the `Streak\Domain\AggregateRoot\Snapshottable` interface for aggregates that support this optimization. It uses the **Memento pattern**: |
| 224 | + |
| 225 | +```php |
| 226 | +<?php |
| 227 | + |
| 228 | +namespace Streak\Domain\AggregateRoot; |
| 229 | + |
| 230 | +interface Snapshottable |
| 231 | +{ |
| 232 | + /** |
| 233 | + * Restores the aggregate's internal state from a previously created memento. |
| 234 | + * |
| 235 | + * @param array $memento The state representation (likely an associative array). |
| 236 | + */ |
| 237 | + public function fromMemento(array $memento); |
| 238 | + |
| 239 | + /** |
| 240 | + * Creates a representation (memento) of the aggregate's current internal state. |
| 241 | + * |
| 242 | + * @return array The state representation (likely an associative array). |
| 243 | + */ |
| 244 | + public function toMemento(): array; |
| 245 | +} |
| 246 | +``` |
| 247 | + |
| 248 | +**Implementation:** |
| 249 | + |
| 250 | +* An aggregate implementing `Snapshottable` needs to provide logic in `toMemento()` to gather all its critical state properties into an array. |
| 251 | +* It needs corresponding logic in `fromMemento()` to restore its properties from such an array. |
| 252 | +* The `EventSourcing` trait likely interacts with these methods when snapshotting is triggered. |
| 253 | + |
| 254 | +### Snapshot Storage and Strategy |
| 255 | + |
| 256 | +* Snapshots need to be stored persistently, typically in a separate storage mechanism (e.g., a database table, document store) managed by a **Snapshot Store** service. |
| 257 | +* The strategy for *when* to take a snapshot (e.g., every N events, periodically) is usually configured or implemented within the persistence layer (e.g., a repository decorator or the Unit of Work). |
| 258 | +* The `StreakBundle` might provide configuration options for enabling snapshotting and configuring the snapshot store and strategy. |
| 259 | + |
| 260 | +### Resetting Snapshots |
| 261 | + |
| 262 | +If the structure of an aggregate's state changes or snapshots become corrupted, you might need to clear them. The `streak:snapshots:reset` console command ([see Console Commands](../symfony-bundle/console-commands.md)) is provided for this purpose, forcing aggregates to be rebuilt from their full event history on the next load. |
0 commit comments