Merge pull request #59009 from nickitat/uniq_optimisation_for_distributed

`uniqExact` state parallel merging for distributed queries
This commit is contained in:
Dmitry Novik 2024-01-22 14:05:02 +01:00 committed by GitHub
commit a18a8d8ea3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 74 additions and 3 deletions

View File

@ -350,7 +350,7 @@ struct Adder
if constexpr (Data::is_able_to_parallelize_merge)
{
if (data.set.isSingleLevel() && data.set.size() > 100'000)
if (data.set.isSingleLevel() && data.set.worthConvertingToTwoLevel(data.set.size()))
data.set.convertToTwoLevel();
}
}

View File

@ -11,10 +11,16 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
}
template <typename SingleLevelSet, typename TwoLevelSet>
class UniqExactSet
{
static_assert(std::is_same_v<typename SingleLevelSet::value_type, typename TwoLevelSet::value_type>);
static_assert(std::is_same_v<typename SingleLevelSet::Cell::State, HashTableNoState>);
public:
using value_type = typename SingleLevelSet::value_type;
@ -147,7 +153,31 @@ public:
}
}
void read(ReadBuffer & in) { asSingleLevel().read(in); }
void read(ReadBuffer & in)
{
size_t new_size = 0;
auto * const position = in.position();
readVarUInt(new_size, in);
if (new_size > 100'000'000'000)
throw DB::Exception(
DB::ErrorCodes::TOO_LARGE_ARRAY_SIZE, "The size of serialized hash table is suspiciously large: {}", new_size);
if (worthConvertingToTwoLevel(new_size))
{
two_level_set = std::make_shared<TwoLevelSet>(new_size);
for (size_t i = 0; i < new_size; ++i)
{
typename SingleLevelSet::Cell x;
x.read(in);
asTwoLevel().insert(x.getValue());
}
}
else
{
in.position() = position; // Rollback position
asSingleLevel().read(in);
}
}
void write(WriteBuffer & out) const
{
@ -166,6 +196,8 @@ public:
return two_level_set ? two_level_set : std::make_shared<TwoLevelSet>(asSingleLevel());
}
static bool worthConvertingToTwoLevel(size_t size) { return size > 100'000; }
void convertToTwoLevel()
{
two_level_set = getTwoLevelSet();

View File

@ -3054,6 +3054,8 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const
{
using namespace CurrentMetrics;
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
{
@ -3062,11 +3064,26 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
res = place;
}
ThreadPool thread_pool{AggregatorThreads, AggregatorThreadsActive, AggregatorThreadsScheduled, params.max_threads};
for (size_t row = row_begin; row < row_end; ++row)
{
/// Adding Values
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool);
{
if (aggregate_functions[i]->isParallelizeMergePrepareNeeded())
{
std::vector<AggregateDataPtr> data_vec{res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row]};
aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool);
}
if (aggregate_functions[i]->isAbleToParallelizeMerge())
aggregate_functions[i]->merge(
res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], thread_pool, result.aggregates_pool);
else
aggregate_functions[i]->merge(
res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool);
}
}
}

View File

@ -0,0 +1,22 @@
<test>
<substitutions>
<substitution>
<name>uniq_keys</name>
<values>
<value>100000</value>
<value>250000</value>
<value>500000</value>
<value>1000000</value>
<value>5000000</value>
</values>
</substitution>
</substitutions>
<create_query>create table t_{uniq_keys}(a UInt64) engine=MergeTree order by tuple()</create_query>
<fill_query>insert into t_{uniq_keys} select number % {uniq_keys} from numbers_mt(5e7)</fill_query>
<query>SELECT uniqExact(a) FROM remote('127.0.0.{{1,2}}', default, t_{uniq_keys}) SETTINGS max_threads=5</query>
<drop_query>drop table t_{uniq_keys}</drop_query>
</test>