Add parallel state merge for some other combinator except If (#50413)

* Add parallel state merge for some other combinator except If

* add test

* update test
This commit is contained in:
flynn 2023-06-08 06:41:32 +08:00 committed by GitHub
parent b11f744252
commit 92c87dedad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 1 deletions

View File

@ -141,6 +141,13 @@ public:
nested_func->merge(place, rhs, arena);
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
{
nested_func->serialize(place, buf, version);

View File

@ -110,6 +110,13 @@ public:
nested_func->merge(place, rhs, arena);
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
{
nested_func->serialize(place, buf, version);

View File

@ -148,6 +148,13 @@ public:
nested_function->merge(nestedPlace(place), nestedPlace(rhs), arena);
}
bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
nested_function->merge(nestedPlace(place), nestedPlace(rhs), thread_pool, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
{
bool flag = getFlag(place);

View File

@ -91,6 +91,13 @@ public:
nested_func->merge(place, rhs, arena);
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
{
nested_func->serialize(place, buf, version);

View File

@ -1,3 +1,7 @@
<test>
<query>SELECT uniqExactIf(number, 1) FROM numbers_mt(1e6)</query>
<query>SELECT uniqExactIf(number, 1) FROM numbers_mt(1e7)</query>
<query>SELECT uniqExactState(number) FROM numbers_mt(1e7) Format Null</query>
<query>SELECT uniqExactArray([number]) FROM numbers_mt(1e7) Format Null</query>
<query>with (SELECT uniqExactState(number) FROM numbers_mt(1e7)) as a select uniqExactMerge(a)</query>
<query>SELECT uniqExactOrNull(number) FROM numbers_mt(1e7)</query>
</test>