feat: Support general spill mechanism for distinct aggregation#15876
feat: Support general spill mechanism for distinct aggregation#15876clay4megtr wants to merge 5 commits intofacebookincubator:mainfrom
Conversation
✅ Deploy Preview for meta-velox canceled.
|
e2be9a1 to
5c8b587
Compare
|
hi, @xiaoxmeng @mbasmanova @tanjialiang @duxiao1212 , This pr is ready for review, PTAL, Thx!
|
xiaoxmeng
left a comment
There was a problem hiding this comment.
@clay4megtr LGTM % minors. A question on null value handling. Thanks!
xiaoxmeng
left a comment
There was a problem hiding this comment.
@clay4megtr LGTM % one nit. Thanks!
|
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this in D90958171. |
tanjialiang
left a comment
There was a problem hiding this comment.
Thanks, LGTM % some nits
|
|
||
| std::vector<VectorPtr> makeInputForAggregation(const VectorPtr& input) const { | ||
| if (isSingleInputAggregate()) { | ||
| std::vector<VectorPtr> makeInputForAggregation(VectorPtr& input) const { |
There was a problem hiding this comment.
why do we drop const here?
There was a problem hiding this comment.
why do we drop const here?
since we use std::move in L214
There was a problem hiding this comment.
I think the original code was misleading. The original code, since having the const qualifier, making the std::move() actually performing a copy. your change breaks the contract of this method by making input non-const (meaning the external referenced input is going to be moved). it is normally dangerous. We should either maintain the const signature and return a copy (without std::move) or make the signature rvalue reference VectorPtr&&
There was a problem hiding this comment.
actually since the code is already landed, i'll help to make the change.
| if constexpr (std::is_same_v<T, ComplexType>) { | ||
| offset += accumulator->extractValues(*elementsVector, offset); | ||
| } else { | ||
| offset += accumulator->extractValues( |
There was a problem hiding this comment.
@clay4megtr , If the total size of distinct elements in that group exceeds 2GB, the spill fails with validateSpillBytesSize. i was wondering how do you plan to handle groups where the distinct values exceed 2GB?
There was a problem hiding this comment.
@clay4megtr , If the total size of distinct elements in that group exceeds 2GB, the spill fails with validateSpillBytesSize. i was wondering how do you plan to handle groups where the distinct values exceed 2GB?
This is a typical hot key scenario. The issue occurs not only during the spill write stage, but also during the spill merge stage, as it may involve several memory bloat issues. This is exactly what the Dynamic Aggregation Spill Mechanism is designed to address. You can find more details here: https://docs.google.com/document/d/1jhWUcvun3fHe_v4-Fi1vjBFzIbpth6dqPvwbhShwdqg/edit?tab=t.0#heading=h.4dbf3b8kvqhb
.
There was a problem hiding this comment.
thanks for the doc, @clay4megtr , the initial PR looks good to me, my only concern is that the ArrayVector's actual size > 2GB is not an uncommon cases/
| if constexpr (std::is_same_v<T, ComplexType>) { | ||
| offset += accumulator->extractValues(*elementsVector, offset); | ||
| } else { | ||
| offset += accumulator->extractValues( |
There was a problem hiding this comment.
thanks for the doc, @clay4megtr , the initial PR looks good to me, my only concern is that the ArrayVector's actual size > 2GB is not an uncommon cases/
|
@xiaoxmeng merged this pull request in ac20fbc. |

General spill mechanism implementation for the agg(distinct) scenario. For more details, please refer to #15859
Acknowledgment: Parts of this code are derived from @aditi-pandit 's PR (#7791). Much appreciated!