-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpublisher.py
More file actions
68 lines (42 loc) · 1.46 KB
/
publisher.py
File metadata and controls
68 lines (42 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from polyfactory.factories.pydantic_factory import ModelFactory
from cyberfusion.RabbitMQConsumer.config import Config
from cyberfusion.RPCClient import RabbitMQCredentials, RPCClient
from cyberfusion.RabbitMQConsumer.contracts import HandlerBase
from cyberfusion.RabbitMQConsumer.utilities import (
get_exchange_handler_class_request_model,
import_exchange_handler_modules,
)
config = Config("rabbitmq.yml")
assert len(config.virtual_hosts) == 1
virtual_host = config.virtual_hosts[0]
assert len(virtual_host.exchanges) == 1
exchange = virtual_host.exchanges[0]
credentials = RabbitMQCredentials(
ssl_enabled=config.server.ssl,
port=config.server.port,
host=config.server.host,
username=config.server.username,
password=config.server.password,
virtual_host_name=virtual_host.name,
)
client = RPCClient(
credentials,
queue_name=virtual_host.queue,
exchange_name=exchange.name,
timeout=5,
)
def get_handler() -> HandlerBase:
modules = import_exchange_handler_modules([exchange])
module = modules[exchange.name]
handler = module.Handler()
return handler
def get_body(handler: HandlerBase) -> dict:
request_model = get_exchange_handler_class_request_model(handler)
factory = ModelFactory.create_factory(request_model)
built_model = factory.build()
body = built_model.model_dump()
return body
handler = get_handler()
body = get_body(handler)
response = client.request(body)
print(response)