Fix "Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform"

In case of optimize_aggregation_in_order there will be
ChunkInfoWithAllocatedBytes.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-01-14 16:25:34 +03:00
parent 06402386eb
commit a4c2f23b07
3 changed files with 65 additions and 25 deletions

View File

@ -1,9 +1,9 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Interpreters/Aggregator.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Aggregator.h>
namespace DB
{
@ -250,22 +250,30 @@ void GroupingAggregatedTransform::addChunk(Chunk chunk, size_t input)
if (!info)
throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()))
{
Int32 bucket = agg_info->bucket_num;
bool is_overflows = agg_info->is_overflows;
Int32 bucket = agg_info->bucket_num;
bool is_overflows = agg_info->is_overflows;
if (is_overflows)
overflow_chunks.emplace_back(std::move(chunk));
else if (bucket < 0)
if (is_overflows)
overflow_chunks.emplace_back(std::move(chunk));
else if (bucket < 0)
single_level_chunks.emplace_back(std::move(chunk));
else
{
chunks_map[bucket].emplace_back(std::move(chunk));
has_two_level = true;
last_bucket_number[input] = bucket;
}
}
else if (const auto * in_order_info = typeid_cast<const ChunkInfoWithAllocatedBytes *>(info.get()))
{
single_level_chunks.emplace_back(std::move(chunk));
}
else
{
chunks_map[bucket].emplace_back(std::move(chunk));
has_two_level = true;
last_bucket_number[input] = bucket;
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Chunk should have AggregatedChunkInfo/ChunkInfoWithAllocatedBytes in GroupingAggregatedTransform.");
}
}
@ -318,16 +326,27 @@ void MergingAggregatedBucketTransform::transform(Chunk & chunk)
throw Exception("Chunk info was not set for chunk in MergingAggregatedBucketTransform.",
ErrorCodes::LOGICAL_ERROR);
const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(cur_info.get());
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedBucketTransform.",
ErrorCodes::LOGICAL_ERROR);
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(cur_info.get()))
{
Block block = header.cloneWithColumns(cur_chunk.detachColumns());
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;
Block block = header.cloneWithColumns(cur_chunk.detachColumns());
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;
blocks_list.emplace_back(std::move(block));
}
else if (const auto * in_order_info = typeid_cast<const ChunkInfoWithAllocatedBytes *>(cur_info.get()))
{
Block block = header.cloneWithColumns(cur_chunk.detachColumns());
block.info.is_overflows = false;
block.info.bucket_num = -1;
blocks_list.emplace_back(std::move(block));
blocks_list.emplace_back(std::move(block));
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Chunk should have AggregatedChunkInfo/ChunkInfoWithAllocatedBytes in MergingAggregatedBucketTransform.");
}
}
auto res_info = std::make_shared<AggregatedChunkInfo>();
@ -379,7 +398,8 @@ void SortingAggregatedTransform::addChunk(Chunk chunk, size_t from_input)
const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo in SortingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Chunk should have AggregatedChunkInfo in SortingAggregatedTransform.");
Int32 bucket = agg_info->bucket_num;
bool is_overflows = agg_info->is_overflows;
@ -389,8 +409,10 @@ void SortingAggregatedTransform::addChunk(Chunk chunk, size_t from_input)
else
{
if (chunks[bucket])
throw Exception("SortingAggregatedTransform already got bucket with number " + toString(bucket),
ErrorCodes::LOGICAL_ERROR);
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"SortingAggregatedTransform already got bucket with number {}", bucket);
}
chunks[bucket] = std::move(chunk);
last_bucket_number[from_input] = bucket;

View File

@ -0,0 +1,6 @@
-- { echoOn }
-- regression for optimize_aggregation_in_order
-- that cause "Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform" error
select count() from remote('127.{1,2}', currentDatabase(), data_02177) group by key settings optimize_aggregation_in_order=1;
2

View File

@ -0,0 +1,12 @@
drop table if exists data_02177;
create table data_02177 (key Int) Engine=MergeTree() order by key;
insert into data_02177 values (1);
-- { echoOn }
-- regression for optimize_aggregation_in_order
-- that cause "Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform" error
select count() from remote('127.{1,2}', currentDatabase(), data_02177) group by key settings optimize_aggregation_in_order=1;
-- { echoOff }
drop table data_02177;