Adding comments and checks.

This commit is contained in:
Nikolai Kochetov 2024-08-26 14:37:02 +00:00
parent 0e136ded28
commit 77061db955
2 changed files with 28 additions and 1 deletions

View File

@ -10,6 +10,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static bool memoryBoundMergingWillBeUsed(
const DataStream & input_stream,
bool memory_bound_merging_of_aggregation_results_enabled,
@ -93,6 +98,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
if (memoryBoundMergingWillBeUsed())
{
if (input_streams.front().header.has("__grouping_set"))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory bound merging of aggregated results is not supported for grouping sets.");
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
@ -123,6 +132,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
if (!memory_efficient_aggregation)
{
if (input_streams.front().header.has("__grouping_set"))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory efficient merging of aggregated results is not supported for grouping sets.");
/// We union several sources into one, paralleling the work.
pipeline.resize(1);

View File

@ -13,6 +13,10 @@ namespace ErrorCodes
Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header)
{
/// __grouping_set is neigher GROUP BY key nor an aggregate function.
/// It behaves like a GROUP BY key, but we cannot append it to keys
/// because it changes hashing method and buckets for two level aggregation.
/// Now, this column is processed "manually" by merging each group separately.
if (in_header.has("__grouping_set"))
out_header.insert(0, in_header.getByName("__grouping_set"));
@ -39,32 +43,41 @@ void MergingAggregatedTransform::addBlock(Block block)
auto grouping_column = block.getByPosition(grouping_position).column;
block.erase(grouping_position);
/// Split a block by __grouping_set values.
const auto * grouping_column_typed = typeid_cast<const ColumnUInt64 *>(grouping_column.get());
if (!grouping_column_typed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName());
const auto & grouping_data = grouping_column_typed->getData();
/// Enumerate groups and fill the selector.
std::map<UInt64, size_t> enumerated_groups;
IColumn::Selector selector;
const auto & grouping_data = grouping_column_typed->getData();
size_t num_rows = grouping_data.size();
UInt64 last_group = grouping_data[0];
for (size_t row = 1; row < num_rows; ++row)
{
auto group = grouping_data[row];
/// Optimization for equal ranges.
if (last_group == group)
continue;
/// Optimization for single group.
if (enumerated_groups.empty())
{
selector.reserve(num_rows);
enumerated_groups.emplace(last_group, enumerated_groups.size());
}
/// Fill the last equal range.
selector.resize_fill(row, enumerated_groups[last_group]);
/// Enumerate new group if did not see it before.
enumerated_groups.emplace(last_group, enumerated_groups.size());
}
/// Optimization for single group.
if (enumerated_groups.empty())
{
auto & bucket_to_blocks = grouping_sets[last_group];
@ -72,6 +85,7 @@ void MergingAggregatedTransform::addBlock(Block block)
return;
}
/// Fill the last equal range.
selector.resize_fill(num_rows, enumerated_groups[last_group]);
const size_t num_groups = enumerated_groups.size();