-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadvanced_mqtt.py
More file actions
62 lines (47 loc) · 2.46 KB
/
advanced_mqtt.py
File metadata and controls
62 lines (47 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from wiredflow.main.build import FlowBuilder
from wiredflow.mocks.demo_bindings_threads import remove_temporary_folder_for_demo, \
launch_demo_with_several_mqtt_connectors
def custom_logic(**parameters_to_use):
""" Simple realization of core logic """
db_connectors = parameters_to_use['db_connectors']
obtained_integers = db_connectors['int_subscriber'].load()
obtained_letters = db_connectors['str_subscriber'].load()
integers = list(map(lambda x: int(x['Generated number']), obtained_integers))
letters = list(map(lambda x: x['Generated letter'], obtained_letters))
match_to_send = f'{integers[-1]}-{letters[-1]}'
print(f'Match to send {match_to_send}')
return {'match': match_to_send}
def launch_advanced_mqtt_demo():
"""
An example of how to launch a flow to collect some data from MQTT queues,
save it into a file and then perform some transformations.
Task: implement algorithm to store all information obtained from brokers
and every 15 seconds match last obtained integer and letter and send such
a message to topic 'demo/matched'.
NB: Demo will be executed in the loop. This means that the example won't
finish calculating until you stop it yourself. Alternatively - you can assign
'execution_seconds' parameter to set the timeout
"""
flow_builder = FlowBuilder()
# Get integers values via MQTT
flow_builder.add_pipeline('int_subscriber')\
.with_mqtt_connector(source='localhost', port=1883, topic='/demo/integers',
username='wiredflow', password='wiredflow')\
.with_storage('json', preprocessing='add_datetime')
# Get letters via MQTT
flow_builder.add_pipeline('str_subscriber') \
.with_mqtt_connector(source='localhost', port=1883, topic='/demo/letters',
username='wiredflow', password='wiredflow') \
.with_storage('json', preprocessing='add_datetime')
flow_builder.add_pipeline('custom_core', timedelta_seconds=10) \
.with_core_logic(custom_logic) \
.send(destination='localhost', port=1883, topic='demo/matched',
label_to_send='match')
# Configure service and launch it
flow = flow_builder.build()
# Or simply flow.launch_flow()
# if there is no need to launch local demo http server
launch_demo_with_several_mqtt_connectors(flow, execution_seconds=20)
if __name__ == '__main__':
remove_temporary_folder_for_demo()
launch_advanced_mqtt_demo()