Fix result for SimpleAggregateFunction with LowCardinality for AggregatingSortedBlockInputStream.

This commit is contained in:
Nikolai Kochetov 2020-01-10 17:01:24 +03:00
parent 64d5789dec
commit 87377431f4
2 changed files with 18 additions and 5 deletions

View File

@ -60,8 +60,6 @@ AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{
ColumnNumbers positions;
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
@ -96,7 +94,7 @@ AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
columns_to_simple_aggregate.emplace_back(std::move(desc));
if (recursiveRemoveLowCardinality(column.type).get() != column.type.get())
positions.emplace_back(i);
converted_lc_columns.emplace_back(i);
}
else
{
@ -105,10 +103,12 @@ AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
}
}
if (!positions.empty())
result_header = header;
if (!converted_lc_columns.empty())
{
for (auto & input : children)
input = std::make_shared<RemovingLowCardinalityBlockInputStream>(input, positions);
input = std::make_shared<RemovingLowCardinalityBlockInputStream>(input, converted_lc_columns);
header = children.at(0)->getHeader();
}
@ -134,6 +134,14 @@ Block AggregatingSortedBlockInputStream::readImpl()
columns_to_aggregate[i] = typeid_cast<ColumnAggregateFunction *>(merged_columns[column_numbers_to_aggregate[i]].get());
merge(merged_columns, queue_without_collation);
for (auto & pos : converted_lc_columns)
{
auto & from_type = header.getByPosition(pos).type;
auto & to_type = result_header.getByPosition(pos).type;
merged_columns[pos] = (*recursiveTypeConversion(std::move(merged_columns[pos]), from_type, to_type)).mutate();
}
return header.cloneWithColumns(std::move(merged_columns));
}

View File

@ -31,6 +31,8 @@ public:
bool isSortedOutput() const override { return true; }
Block getHeader() const override { return result_header; }
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;
@ -52,6 +54,9 @@ private:
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
Block result_header;
ColumnNumbers converted_lc_columns;
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
*/