Skip to content

Commit 1dfac07

Browse files
committed
WIP refactor separating queue backend from batch management
1 parent 7847d73 commit 1dfac07

9 files changed

+265
-317
lines changed

src/FlowExtension.php

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
use Symfony\Component\DependencyInjection\Extension\ExtensionInterface;
1414
use Symfony\Component\DependencyInjection\Reference;
1515
use Webgriffe\Esb\Model\FlowConfig;
16-
use Webgriffe\Esb\Service\QueueManager;
16+
use Webgriffe\Esb\Service\BatchManager;
17+
use Webgriffe\Esb\Service\BatchManagerFactory;
18+
use Webgriffe\Esb\Service\BeanstalkElasticsearchQueueBackend;
1719

1820
final class FlowExtension implements ExtensionInterface, CompilerPassInterface
1921
{
@@ -66,10 +68,12 @@ public function process(ContainerBuilder $container): void
6668
{
6769
//These classes are defined manually. Remove the default definitions otherwise the container generates errors
6870
//trying to autowire them
69-
$container->removeDefinition(QueueManager::class);
71+
$container->removeDefinition(BeanstalkElasticsearchQueueBackend::class);
7072
$container->removeDefinition(FlowConfig::class);
7173
$container->removeDefinition(ProducerInstance::class);
7274
$container->removeDefinition(WorkerInstance::class);
75+
$container->removeDefinition(BatchManager::class);
76+
$container->removeDefinition(BatchManagerFactory::class);
7377

7478
$flowManagerDefinition = $container->findDefinition(FlowManager::class);
7579
foreach ($this->flowsConfig as $flowName => $flowConfigData) {
@@ -78,30 +82,39 @@ public function process(ContainerBuilder $container): void
7882
$flowDefinition = new Definition(Flow::class);
7983
$flowDefinition->setAutowired(true);
8084
$flowDefinition->setArgument('$flowConfig', $flowConfig);
81-
$queueManagerId = 'flow.queue_manager.' . $flowName;
85+
$queueBackendId = 'flow.queue_backend.' . $flowName;
86+
$batchManagerFactoryId = 'flow.batch_manager_factory.' . $flowName;
8287
try {
8388
$producerDefinition = $container->findDefinition($flowConfig->getProducerServiceId());
8489
$producerDefinition->setShared(false);
8590

86-
$queueManagerDefinition = new Definition();
87-
$queueManagerDefinition
91+
$queueBackendDefinition = new Definition();
92+
$queueBackendDefinition
8893
->setShared(false)
8994
->setAutowired(true)
90-
->setClass(QueueManager::class)
95+
->setClass(BeanstalkElasticsearchQueueBackend::class)
9196
->setArgument('$flowConfig', $flowConfig)
97+
;
98+
$container->setDefinition($queueBackendId, $queueBackendDefinition);
99+
100+
$batchManagerFactoryDefinition = new Definition();
101+
$batchManagerFactoryDefinition
102+
->setShared(false)
103+
->setAutowired(true)
104+
->setClass(BatchManagerFactory::class)
105+
->setArgument('$queueBackend', new Reference($queueBackendId))
92106
->setArgument('$batchSize', $flowConfig->getProducerBatchSize())
93107
;
94-
$container->setDefinition($queueManagerId, $queueManagerDefinition);
108+
$container->setDefinition($batchManagerFactoryId, $batchManagerFactoryDefinition);
95109

96110
$producerInstanceDefinition = new Definition();
97111
$producerInstanceDefinition
98112
->setAutowired(true)
99113
->setClass(ProducerInstance::class)
100-
->setArgument('$producer', new Reference($flowConfig->getProducerServiceId()))
101114
->setArgument('$flowConfig', $flowConfig)
102-
->setArgument('$queueManager', new Reference($queueManagerId))
103-
->setArgument('$beanstalkClient', null)
104-
->setArgument('$elasticSearch', null)
115+
->setArgument('$producer', new Reference($flowConfig->getProducerServiceId()))
116+
->setArgument('$queueBackend', new Reference($queueBackendId))
117+
->setArgument('$batchManagerFactory', new Reference($batchManagerFactoryId))
105118
;
106119
$producerInstanceId = 'flow.producer_instance' . $flowName;
107120
$container->setDefinition($producerInstanceId, $producerInstanceDefinition);
@@ -129,9 +142,7 @@ public function process(ContainerBuilder $container): void
129142
->setArgument('$flowConfig', $flowConfig)
130143
->setArgument('$instanceId', $instanceId)
131144
->setArgument('$worker', new Reference($flowConfig->getWorkerServiceId()))
132-
->setArgument('$queueManager', new Reference($queueManagerId))
133-
->setArgument('$beanstalkClient', null)
134-
->setArgument('$elasticSearch', null)
145+
->setArgument('$queueBackend', new Reference($queueBackendId))
135146
;
136147
$workerInstanceId = sprintf('flow.worker_instance.%s.%s', $flowName, $instanceId);
137148
$container->setDefinition($workerInstanceId, $workerInstanceDefinition);

src/ProducerInstance.php

Lines changed: 16 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -4,123 +4,39 @@
44

55
namespace Webgriffe\Esb;
66

7-
use Amp\Beanstalk\BeanstalkClient;
8-
use function Amp\call;
97
use Amp\Loop;
108
use Amp\Promise;
119
use Psr\Log\LoggerInterface;
1210
use Webgriffe\Esb\Model\FlowConfig;
1311
use Webgriffe\Esb\Model\Job;
1412
use Webgriffe\Esb\Model\ProducedJobEvent;
13+
use Webgriffe\Esb\Service\BatchManagerFactory;
1514
use Webgriffe\Esb\Service\CronProducersServer;
16-
use Webgriffe\Esb\Service\ElasticSearch;
1715
use Webgriffe\Esb\Service\HttpProducersServer;
18-
use Webgriffe\Esb\Service\ProducerQueueManagerInterface;
19-
use Webgriffe\Esb\Service\QueueManager;
20-
21-
final class ProducerInstance implements ProducerInstanceInterface
22-
{
23-
/**
24-
* @var FlowConfig
25-
*/
26-
private $flowConfig;
27-
28-
/**
29-
* @var ProducerInterface
30-
*/
31-
private $producer;
32-
33-
/**
34-
* @var LoggerInterface
35-
*/
36-
private $logger;
3716

38-
/**
39-
* @var HttpProducersServer
40-
*/
41-
private $httpProducersServer;
17+
use Webgriffe\Esb\Service\QueueBackendInterface;
4218

43-
/**
44-
* @var CronProducersServer
45-
*/
46-
private $cronProducersServer;
19+
use function Amp\call;
4720

48-
/**
49-
* @var ProducerQueueManagerInterface
50-
*/
51-
private $queueManager;
21+
final class ProducerInstance implements ProducerInstanceInterface
22+
{
5223

5324
public function __construct(
54-
FlowConfig $flowConfig,
55-
ProducerInterface $producer,
56-
?BeanstalkClient $beanstalkClient,
57-
LoggerInterface $logger,
58-
HttpProducersServer $httpProducersServer,
59-
CronProducersServer $cronProducersServer,
60-
?ElasticSearch $elasticSearch,
61-
?ProducerQueueManagerInterface $queueManager = null
25+
private readonly FlowConfig $flowConfig,
26+
private readonly ProducerInterface $producer,
27+
private readonly LoggerInterface $logger,
28+
private readonly HttpProducersServer $httpProducersServer,
29+
private readonly CronProducersServer $cronProducersServer,
30+
private readonly QueueBackendInterface $queueBackend,
31+
private readonly BatchManagerFactory $batchManagerFactory,
6232
) {
63-
if ($beanstalkClient !== null) {
64-
trigger_deprecation(
65-
'webgriffe/esb',
66-
'2.2',
67-
'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' .
68-
'Please pass a "%s" instead.',
69-
BeanstalkClient::class,
70-
__CLASS__,
71-
ProducerQueueManagerInterface::class
72-
);
73-
}
74-
if ($elasticSearch !== null) {
75-
trigger_deprecation(
76-
'webgriffe/esb',
77-
'2.2',
78-
'Passing a "%s" to "%s" is deprecated and will be removed in 3.0. ' .
79-
'Please pass a "%s" instead.',
80-
ElasticSearch::class,
81-
__CLASS__,
82-
ProducerQueueManagerInterface::class
83-
);
84-
}
85-
$this->flowConfig = $flowConfig;
86-
$this->producer = $producer;
87-
$this->logger = $logger;
88-
$this->httpProducersServer = $httpProducersServer;
89-
$this->cronProducersServer = $cronProducersServer;
90-
91-
if ($queueManager === null) {
92-
trigger_deprecation(
93-
'webgriffe/esb',
94-
'2.2',
95-
'Not passing a "%s" to "%s" is deprecated and will be required in 3.0.',
96-
ProducerQueueManagerInterface::class,
97-
__CLASS__
98-
);
99-
100-
if (!$beanstalkClient) {
101-
throw new \RuntimeException('Cannot create a QueueManager without the Beanstalk client!');
102-
}
103-
104-
if (!$elasticSearch) {
105-
throw new \RuntimeException('Cannot create a QueueManager without the ElasticSearch client');
106-
}
107-
108-
$queueManager = new QueueManager(
109-
$this->flowConfig,
110-
$beanstalkClient,
111-
$elasticSearch,
112-
$this->logger,
113-
1000
114-
);
115-
}
116-
$this->queueManager = $queueManager;
11733
}
11834

11935
public function boot(): Promise
12036
{
12137
return call(function () {
12238
yield $this->producer->init();
123-
yield $this->queueManager->boot();
39+
yield $this->queueBackend->boot();
12440

12541
$this->logger->info(
12642
'A Producer has been successfully initialized',
@@ -164,6 +80,7 @@ function ($watcherId) {
16480
public function produceAndQueueJobs($data = null): Promise
16581
{
16682
return call(function () use ($data) {
83+
$batchManager = $this->batchManagerFactory->create();
16784
$jobsCount = 0;
16885
$job = null;
16986
try {
@@ -172,10 +89,10 @@ public function produceAndQueueJobs($data = null): Promise
17289
/** @var Job $job */
17390
$job = $jobs->getCurrent();
17491
$job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer)));
175-
$jobsCount += yield $this->queueManager->enqueue($job);
92+
$jobsCount += yield $batchManager->enqueue($job);
17693
}
17794

178-
$jobsCount += yield $this->queueManager->flush();
95+
$jobsCount += yield $batchManager->flush();
17996
} catch (\Throwable $error) {
18097
$this->logger->error(
18198
'An error occurred producing/queueing jobs.',

src/Service/BatchManager.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Webgriffe\Esb\Service;
6+
7+
use Amp\Promise;
8+
use Webgriffe\Esb\Model\JobInterface;
9+
10+
use function Amp\call;
11+
12+
final class BatchManager implements BatchManagerInterface
13+
{
14+
/**
15+
* @var JobInterface[]
16+
*/
17+
private $batch = [];
18+
19+
public function __construct(
20+
private readonly QueueBackendInterface $queueBackend,
21+
private readonly int $batchSize = 1000
22+
) {
23+
}
24+
25+
/**
26+
* @inheritdoc
27+
*/
28+
public function enqueue(JobInterface $job): Promise
29+
{
30+
return call(function () use ($job) {
31+
$jobExists = yield $this->queueBackend->jobExists($job->getUuid());
32+
if ($jobExists) {
33+
throw new \RuntimeException(
34+
sprintf(
35+
'A job with UUID "%s" already exists but this should be a new job.',
36+
$job->getUuid()
37+
)
38+
);
39+
}
40+
$this->batch[$job->getUuid()] = $job;
41+
42+
$count = count($this->batch);
43+
if ($count < $this->batchSize) {
44+
return 0; //Number of jobs actually added to the queue
45+
}
46+
47+
return yield $this->flush();
48+
});
49+
}
50+
51+
/**
52+
* @inheritdoc
53+
*/
54+
public function flush(): Promise
55+
{
56+
return call(function () {
57+
$jobsCount = count($this->batch);
58+
if ($jobsCount > 0) {
59+
yield $this->queueBackend->enqueueJobs($this->batch);
60+
}
61+
$this->batch = [];
62+
63+
return $jobsCount;
64+
});
65+
}
66+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Webgriffe\Esb\Service;
6+
7+
final class BatchManagerFactory
8+
{
9+
public function __construct(
10+
private readonly QueueBackendInterface $queueBackend,
11+
private readonly int $batchSize = 1000
12+
) {
13+
}
14+
15+
public function create(): BatchManagerInterface
16+
{
17+
return new BatchManager($this->queueBackend, $this->batchSize);
18+
}
19+
}

src/Service/ProducerQueueManagerInterface.php renamed to src/Service/BatchManagerInterface.php

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,8 @@
77
use Amp\Promise;
88
use Webgriffe\Esb\Model\JobInterface;
99

10-
interface ProducerQueueManagerInterface
10+
interface BatchManagerInterface
1111
{
12-
/**
13-
* Initializes this queue manager. Must be called before this can be used
14-
*
15-
* @return Promise<null>
16-
*/
17-
public function boot(): Promise;
18-
1912
/**
2013
* Adds a new job to the queue managed by this object. The method returns a promise that resolves to the number of
2114
* jobs that were actually added to the underlying queue. In the simplest case this number is 1, but if the

0 commit comments

Comments
 (0)