Skip to content

Commit 207cab9

Browse files
authored
feat: stream table name generation (#599)
feat: stream table generation
1 parent a972636 commit 207cab9

File tree

8 files changed

+103
-27
lines changed

8 files changed

+103
-27
lines changed

packages/PdoEventSourcing/src/Config/EventSourcingModule.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Ecotone\EventSourcing\EventSourcingRepositoryBuilder;
2626
use Ecotone\EventSourcing\EventStore;
2727
use Ecotone\EventSourcing\EventStreamEmitter;
28+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
2829
use Ecotone\EventSourcing\Mapping\EventMapper;
2930
use Ecotone\EventSourcing\ProjectionLifeCycleConfiguration;
3031
use Ecotone\EventSourcing\ProjectionManager;
@@ -286,6 +287,12 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
286287
Reference::to(LegacyProjectionsTableManager::class),
287288
]));
288289

290+
// Register PdoStreamTableNameProvider as an alias to LazyProophEventStore
291+
$messagingConfiguration->registerServiceDefinition(
292+
PdoStreamTableNameProvider::class,
293+
Reference::to(LazyProophEventStore::class)
294+
);
295+
289296
$messagingConfiguration->registerServiceDefinition(
290297
LazyProophProjectionManager::class,
291298
new Definition(LazyProophProjectionManager::class, [
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Ecotone\EventSourcing;
4+
5+
/**
6+
* Provides table name generation for event streams based on configured persistence strategy.
7+
* This allows runtime resolution of table names instead of hardcoding sha1 hashing.
8+
*
9+
* licence Apache-2.0
10+
*/
11+
interface PdoStreamTableNameProvider
12+
{
13+
/**
14+
* Generate the table name for a given stream based on the configured persistence strategy.
15+
*
16+
* The table name generation depends on:
17+
* - The database type (MySQL, MariaDB, PostgreSQL)
18+
* - The persistence strategy (simple, single, aggregate, partition)
19+
* - The stream name
20+
*
21+
* @param string $streamName The stream name
22+
* @return string The generated table name
23+
*/
24+
public function generateTableNameForStream(string $streamName): string;
25+
}
26+

packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,42 +12,42 @@
1212
use Doctrine\DBAL\Platforms\MySQLPlatform;
1313
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
1414
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
15+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
1516
use Ecotone\Projecting\PartitionProvider;
1617
use Enqueue\Dbal\DbalConnectionFactory;
1718
use RuntimeException;
1819

19-
use function sha1;
20-
2120
class AggregateIdPartitionProvider implements PartitionProvider
2221
{
23-
private string $streamTable;
2422
public function __construct(
2523
private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
2624
private string $aggregateType,
27-
private string $streamName
25+
private string $streamName,
26+
private PdoStreamTableNameProvider $tableNameProvider
2827
) {
29-
// This is the name Prooph uses to store events in the database
30-
$this->streamTable = '_' . sha1($this->streamName);
3128
}
3229

3330
public function partitions(): iterable
3431
{
3532
$connection = $this->getConnection();
3633
$platform = $connection->getDatabasePlatform();
3734

35+
// Resolve table name at runtime using the provider
36+
$streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName);
37+
3838
// Build platform-specific query
3939
if ($platform instanceof PostgreSQLPlatform) {
4040
// PostgreSQL: Use JSONB operators
4141
$query = $connection->executeQuery(<<<SQL
4242
SELECT DISTINCT metadata->>'_aggregate_id' AS aggregate_id
43-
FROM {$this->streamTable}
43+
FROM {$streamTable}
4444
WHERE metadata->>'_aggregate_type' = ?
4545
SQL, [$this->aggregateType]);
4646
} elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) {
4747
// MySQL/MariaDB: Use generated indexed columns for better performance
4848
$query = $connection->executeQuery(<<<SQL
4949
SELECT DISTINCT aggregate_id
50-
FROM {$this->streamTable}
50+
FROM {$streamTable}
5151
WHERE aggregate_type = ?
5252
SQL, [$this->aggregateType]);
5353
} else {

packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
namespace Ecotone\EventSourcing\Projecting;
99

10+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
1011
use Ecotone\Messaging\Config\Container\Definition;
1112
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
1213
use Ecotone\Messaging\Config\Container\Reference;
@@ -31,6 +32,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
3132
Reference::to(DbalConnectionFactory::class),
3233
$this->aggregateType,
3334
$this->streamName,
35+
Reference::to(PdoStreamTableNameProvider::class),
3436
]);
3537
}
3638
}

packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Doctrine\DBAL\Connection;
1313
use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility;
1414
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
15+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
1516
use Ecotone\Messaging\Scheduling\DatePoint;
1617
use Ecotone\Messaging\Scheduling\Duration;
1718
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
@@ -29,7 +30,8 @@ class EventStoreGlobalStreamSource implements StreamSource
2930
public function __construct(
3031
private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
3132
private EcotoneClockInterface $clock,
32-
private string $proophStreamTable,
33+
private string $streamName,
34+
private PdoStreamTableNameProvider $tableNameProvider,
3335
private int $maxGapOffset = 5_000,
3436
private ?Duration $gapTimeout = null,
3537
) {
@@ -50,7 +52,10 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =
5052

5153
$connection = $this->getConnection();
5254

53-
if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $this->proophStreamTable)) {
55+
// Resolve table name at runtime using the provider
56+
$proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName);
57+
58+
if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $proophStreamTable)) {
5459
return new StreamPage([], '');
5560
}
5661

@@ -63,7 +68,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =
6368

6469
$query = $connection->executeQuery(<<<SQL
6570
SELECT no, event_name, payload, metadata, created_at
66-
FROM {$this->proophStreamTable}
71+
FROM {$proophStreamTable}
6772
WHERE no > :position {$gapQueryPart}
6873
ORDER BY no
6974
LIMIT {$count}
@@ -106,10 +111,13 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn
106111
$minGap = $gaps[0];
107112
$maxGap = $gaps[count($gaps) - 1];
108113

114+
// Resolve table name at runtime
115+
$proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName);
116+
109117
// Query interleaved events in the gap range
110118
$interleavedEvents = $connection->executeQuery(<<<SQL
111119
SELECT no, created_at
112-
FROM {$this->proophStreamTable}
120+
FROM {$proophStreamTable}
113121
WHERE no >= :minPosition and no <= :maxPosition
114122
ORDER BY no
115123
LIMIT 100

packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
namespace Ecotone\EventSourcing\Projecting\StreamSource;
99

10+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
1011
use Ecotone\Messaging\Config\Container\Definition;
1112
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
1213
use Ecotone\Messaging\Config\Container\Reference;
@@ -16,8 +17,6 @@
1617
use Ecotone\Projecting\StreamSource;
1718
use Enqueue\Dbal\DbalConnectionFactory;
1819

19-
use function sha1;
20-
2120
class EventStoreGlobalStreamSourceBuilder implements ProjectionComponentBuilder
2221
{
2322
public function __construct(
@@ -38,15 +37,11 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
3837
[
3938
Reference::to(DbalConnectionFactory::class),
4039
Reference::to(EcotoneClockInterface::class),
41-
self::getProophTableName($this->streamName),
40+
$this->streamName,
41+
Reference::to(PdoStreamTableNameProvider::class),
4242
5_000,
4343
new Definition(Duration::class, [60], 'seconds'),
4444
],
4545
);
4646
}
47-
48-
public static function getProophTableName($streamName): string
49-
{
50-
return '_' . sha1($streamName);
51-
}
5247
}

packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Ecotone\EventSourcing\Database\LegacyProjectionsTableManager;
1111
use Ecotone\EventSourcing\EventSourcingConfiguration;
1212
use Ecotone\EventSourcing\InMemory\StreamIteratorWithPosition;
13+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
1314
use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMariaDbSimpleStreamStrategy;
1415
use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMysqlSimpleStreamStrategy;
1516
use Ecotone\EventSourcing\ProophEventMapper;
@@ -35,6 +36,7 @@
3536
use Prooph\EventStore\StreamName;
3637
use RuntimeException;
3738

39+
use function sha1;
3840
use function spl_object_id;
3941
use function str_contains;
4042

@@ -43,7 +45,7 @@
4345
/**
4446
* licence Apache-2.0
4547
*/
46-
class LazyProophEventStore implements EventStore
48+
class LazyProophEventStore implements EventStore, PdoStreamTableNameProvider
4749
{
4850
public const DEFAULT_ENABLE_WRITE_LOCK_STRATEGY = false;
4951
public const INITIALIZE_ON_STARTUP = true;
@@ -305,6 +307,26 @@ private function getPostgresPersistenceStrategyFor(?StreamName $streamName = nul
305307
};
306308
}
307309

310+
public function generateTableNameForStream(string $streamName): string
311+
{
312+
$streamNameObj = new StreamName($streamName);
313+
$eventStoreType = $this->getEventStoreType();
314+
315+
if ($this->eventSourcingConfiguration->isInMemory()) {
316+
// In-memory doesn't use table names, but return consistent format
317+
return '_' . sha1($streamName);
318+
}
319+
320+
$persistenceStrategy = match ($eventStoreType) {
321+
self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamNameObj),
322+
self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamNameObj),
323+
self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamNameObj),
324+
default => throw InvalidArgumentException::create('Unexpected match value ' . $eventStoreType)
325+
};
326+
327+
return $persistenceStrategy->generateTableName($streamNameObj);
328+
}
329+
308330
public function getEventStoreType(): string
309331
{
310332
if ($this->eventSourcingConfiguration->isInMemory()) {

packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
namespace Test\Ecotone\EventSourcing\Projecting;
99

1010
use Ecotone\EventSourcing\EventStore;
11+
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
1112
use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSource;
12-
use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder;
1313
use Ecotone\EventSourcing\Projecting\StreamSource\GapAwarePosition;
1414
use Ecotone\Lite\EcotoneLite;
1515
use Ecotone\Lite\Test\FlowTestSupport;
@@ -30,6 +30,8 @@
3030
use Test\Ecotone\EventSourcing\Projecting\Fixture\Ticket\TicketCreated;
3131
use Test\Ecotone\EventSourcing\Projecting\Fixture\Ticket\TicketEventConverter;
3232

33+
use function sha1;
34+
3335
/**
3436
* @internal
3537
*/
@@ -42,11 +44,21 @@ class GapAwarePositionIntegrationTest extends ProjectingTestCase
4244
private static EventStore $eventStore;
4345
private static ProjectingManager $projectionManager;
4446
private static string $proophTicketTable;
47+
private static PdoStreamTableNameProvider $tableNameProvider;
4548

4649
protected function setUp(): void
4750
{
4851
self::$connectionFactory = self::getConnectionFactory();
4952
self::$clock = new StubUTCClock();
53+
54+
// Create a stub table name provider
55+
self::$tableNameProvider = new class implements PdoStreamTableNameProvider {
56+
public function generateTableNameForStream(string $streamName): string
57+
{
58+
return '_' . sha1($streamName);
59+
}
60+
};
61+
5062
$projection = new #[ProjectionV2(DbalTicketProjection::NAME)] class (self::$connectionFactory->establishConnection()) extends DbalTicketProjection {
5163
};
5264
self::$projection = $projection;
@@ -69,7 +81,7 @@ classesToResolve: [$projection::class],
6981
runForProductionEventStore: true
7082
);
7183

72-
self::$proophTicketTable = EventStoreGlobalStreamSourceBuilder::getProophTableName(Ticket::STREAM_NAME);
84+
self::$proophTicketTable = self::$tableNameProvider->generateTableNameForStream(Ticket::STREAM_NAME);
7385
self::$eventStore = self::$ecotone->getGateway(EventStore::class);
7486
self::$projectionManager = self::$ecotone->getGateway(ProjectionRegistry::class)->get(DbalTicketProjection::NAME);
7587
if (self::$eventStore->hasStream(Ticket::STREAM_NAME)) {
@@ -100,7 +112,8 @@ public function test_max_gap_offset_cleaning(): void
100112
$streamSource = new EventStoreGlobalStreamSource(
101113
self::$connectionFactory,
102114
self::$clock,
103-
self::$proophTicketTable,
115+
Ticket::STREAM_NAME,
116+
self::$tableNameProvider,
104117
maxGapOffset: 3, // Only keep gaps within 3 positions
105118
gapTimeout: null
106119
);
@@ -137,7 +150,8 @@ public function test_gap_timeout_cleaning(): void
137150
$streamSource = new EventStoreGlobalStreamSource(
138151
self::$connectionFactory,
139152
self::$clock,
140-
self::$proophTicketTable,
153+
Ticket::STREAM_NAME,
154+
self::$tableNameProvider,
141155
gapTimeout: Duration::seconds(5)
142156
);
143157

@@ -170,7 +184,8 @@ public function test_gap_cleaning_noop_when_no_gaps(): void
170184
$streamSource = new EventStoreGlobalStreamSource(
171185
self::$connectionFactory,
172186
self::$clock,
173-
self::$proophTicketTable,
187+
Ticket::STREAM_NAME,
188+
self::$tableNameProvider,
174189
maxGapOffset: 1000,
175190
gapTimeout: Duration::seconds(5)
176191
);
@@ -191,7 +206,8 @@ public function test_gap_cleaning_noop_when_timeout_disabled(): void
191206
$streamSource = new EventStoreGlobalStreamSource(
192207
self::$connectionFactory,
193208
self::$clock,
194-
self::$proophTicketTable,
209+
Ticket::STREAM_NAME,
210+
self::$tableNameProvider,
195211
maxGapOffset: 1000,
196212
gapTimeout: null // No timeout
197213
);

0 commit comments

Comments
 (0)