Skip to content

Commit 4a2fd69

Browse files
committed
Optimize dynamic subqueries using the same task index
1 parent fb35d0f commit 4a2fd69

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

syncanysql/calculaters/query_tasker_calculater.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ def start(self, tasker, loader, arguments, task_config, **kwargs):
2626
if key[:knl] != kn:
2727
continue
2828
self.arguments[key[knl:]] = value
29-
self.create_executor_tasker()
29+
self.create_executor_tasker(current_tasker)
3030
finally:
3131
_thread_local.current_tasker = current_tasker
3232

3333
def calculate(self, primary_keys, query, task_config, *args):
3434
current_tasker = _thread_local.current_tasker
3535
try:
3636
if self.executor is None:
37-
self.create_executor_tasker()
37+
self.create_executor_tasker(current_tasker)
3838
with self.executor as executor:
3939
for exp, values in query["filters"].items():
4040
if not exp:
@@ -54,7 +54,7 @@ def calculate(self, primary_keys, query, task_config, *args):
5454
self.executor, self.tasker = None, None
5555
_thread_local.current_tasker = current_tasker
5656

57-
def create_executor_tasker(self):
57+
def create_executor_tasker(self, current_tasker):
5858
from ..executor import Executor
5959
from ..taskers.query import QueryTasker
6060

@@ -64,6 +64,9 @@ def create_executor_tasker(self):
6464
config["output"] = "&.--.__queryTasker_" + str(id(executor)) + "::" + config["output"].split("::")[-1]
6565
config["name"] = config["name"] + "#queryTasker"
6666
tasker = QueryTasker(config, is_inner_subquery=True)
67+
if current_tasker and hasattr(current_tasker, "tasker_index"):
68+
setattr(tasker, "tasker_index", executor.distribute_tasker_index())
69+
tasker.name = "[%s]%s" % (current_tasker.tasker_index, tasker.name)
6770
executor.runners.extend(tasker.start(config.get("name"), executor, executor.session_config,
6871
executor.manager, self.arguments))
6972
self.tasker, self.executor = tasker, executor

syncanysql/taskers/query.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ def start(self, name, executor, session_config, manager, arguments):
143143
tasker.add_hooker(ReduceHooker(executor, session_config, manager, arguments,
144144
self, copy.deepcopy(self.config), batch, aggregate))
145145
arguments = self.compile_tasker(arguments, tasker)
146-
setattr(tasker, "tasker_index", executor.distribute_tasker_index())
147-
tasker.name = "[%s]%s" % (tasker.tasker_index, tasker.name)
146+
if not hasattr(tasker, "tasker_index"):
147+
setattr(tasker, "tasker_index", executor.distribute_tasker_index())
148+
tasker.name = "[%s]%s" % (tasker.tasker_index, tasker.name)
148149
self.tasker, self.dependency_taskers, self.arguments = tasker, dependency_taskers, arguments
149150
return [self]
150151

0 commit comments

Comments
 (0)