mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
fixed faults on LC
This commit is contained in:
parent
7bbb85dbe5
commit
465dfe47fc
@ -557,30 +557,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
|
|||||||
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
|
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Aggregator::prepareKeysAndInstructions(Columns columns, AggregatedDataVariants & result, ColumnRawPtrs & key_columns,
|
void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
|
||||||
AggregateColumns & aggregate_columns, Columns & materialized_columns,
|
AggregateFunctionInstructions & aggregate_functions_instructions)
|
||||||
AggregateFunctionInstructions & aggregate_functions_instructions)
|
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||||
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
|
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
|
||||||
|
|
||||||
/// Remember the columns we will work with
|
|
||||||
for (size_t i = 0; i < params.keys_size; ++i)
|
|
||||||
{
|
|
||||||
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
|
|
||||||
key_columns[i] = materialized_columns.back().get();
|
|
||||||
|
|
||||||
if (!result.isLowCardinality())
|
|
||||||
{
|
|
||||||
auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
|
|
||||||
if (column_no_lc.get() != key_columns[i])
|
|
||||||
{
|
|
||||||
materialized_columns.emplace_back(std::move(column_no_lc));
|
|
||||||
key_columns[i] = materialized_columns.back().get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
aggregate_functions_instructions.resize(params.aggregates_size + 1);
|
aggregate_functions_instructions.resize(params.aggregates_size + 1);
|
||||||
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
|
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
|
||||||
|
|
||||||
@ -655,9 +637,26 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
|
|||||||
* To make them work anyway, we materialize them.
|
* To make them work anyway, we materialize them.
|
||||||
*/
|
*/
|
||||||
Columns materialized_columns;
|
Columns materialized_columns;
|
||||||
AggregateFunctionInstructions aggregate_functions_instructions;
|
|
||||||
|
|
||||||
prepareKeysAndInstructions(columns, result, key_columns, aggregate_columns, materialized_columns, aggregate_functions_instructions);
|
/// Remember the columns we will work with
|
||||||
|
for (size_t i = 0; i < params.keys_size; ++i)
|
||||||
|
{
|
||||||
|
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
|
||||||
|
key_columns[i] = materialized_columns.back().get();
|
||||||
|
|
||||||
|
if (!result.isLowCardinality())
|
||||||
|
{
|
||||||
|
auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
|
||||||
|
if (column_no_lc.get() != key_columns[i])
|
||||||
|
{
|
||||||
|
materialized_columns.emplace_back(std::move(column_no_lc));
|
||||||
|
key_columns[i] = materialized_columns.back().get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregateFunctionInstructions aggregate_functions_instructions;
|
||||||
|
prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_functions_instructions);
|
||||||
|
|
||||||
if (isCancelled())
|
if (isCancelled())
|
||||||
return true;
|
return true;
|
||||||
@ -1154,7 +1153,7 @@ void Aggregator::fillAggregateColumnsWithSingleKey(
|
|||||||
|
|
||||||
void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
|
void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
|
||||||
AggregatedDataVariants & data_variants,
|
AggregatedDataVariants & data_variants,
|
||||||
ColumnRawPtrs key_columns,
|
Columns key_columns,
|
||||||
size_t key_row,
|
size_t key_row,
|
||||||
MutableColumns & final_key_columns)
|
MutableColumns & final_key_columns)
|
||||||
{
|
{
|
||||||
@ -1164,7 +1163,7 @@ void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
|
|||||||
|
|
||||||
for (size_t i = 0; i < params.keys_size; ++i)
|
for (size_t i = 0; i < params.keys_size; ++i)
|
||||||
{
|
{
|
||||||
final_key_columns[i]->insertFrom(*key_columns[i], key_row);
|
final_key_columns[i]->insertFrom(*key_columns[i].get(), key_row);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1258,10 +1258,8 @@ protected:
|
|||||||
*/
|
*/
|
||||||
bool checkLimits(size_t result_size, bool & no_more_keys) const;
|
bool checkLimits(size_t result_size, bool & no_more_keys) const;
|
||||||
|
|
||||||
void prepareKeysAndInstructions(
|
void prepareAggregateInstructions(
|
||||||
Columns columns,
|
Columns columns,
|
||||||
AggregatedDataVariants & result,
|
|
||||||
ColumnRawPtrs & key_columns,
|
|
||||||
AggregateColumns & aggregate_columns,
|
AggregateColumns & aggregate_columns,
|
||||||
Columns & materialized_columns,
|
Columns & materialized_columns,
|
||||||
AggregateFunctionInstructions & instructions);
|
AggregateFunctionInstructions & instructions);
|
||||||
@ -1272,7 +1270,7 @@ protected:
|
|||||||
|
|
||||||
void createStatesAndFillKeyColumnsWithSingleKey(
|
void createStatesAndFillKeyColumnsWithSingleKey(
|
||||||
AggregatedDataVariants & data_variants,
|
AggregatedDataVariants & data_variants,
|
||||||
ColumnRawPtrs key_columns, size_t key_row,
|
Columns key_columns, size_t key_row,
|
||||||
MutableColumns & final_key_columns);
|
MutableColumns & final_key_columns);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
#include <Processors/Transforms/AggregatingInOrderTransform.h>
|
#include <Processors/Transforms/AggregatingInOrderTransform.h>
|
||||||
|
#include <DataTypes/DataTypeLowCardinality.h>
|
||||||
#include <utility>
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -12,7 +11,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
|
|||||||
, params(std::move(params_))
|
, params(std::move(params_))
|
||||||
, sort_description(sort_description_)
|
, sort_description(sort_description_)
|
||||||
, group_by_description(group_by_description_)
|
, group_by_description(group_by_description_)
|
||||||
, key_columns(params->params.keys_size)
|
|
||||||
, aggregate_columns(params->params.aggregates_size)
|
, aggregate_columns(params->params.aggregates_size)
|
||||||
, many_data(std::make_shared<ManyAggregatedData>(1))
|
, many_data(std::make_shared<ManyAggregatedData>(1))
|
||||||
, variants(*many_data->variants[0])
|
, variants(*many_data->variants[0])
|
||||||
@ -35,7 +33,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
|
|||||||
|
|
||||||
for (size_t i = 0; i < params->params.keys_size; ++i)
|
for (size_t i = 0; i < params->params.keys_size; ++i)
|
||||||
{
|
{
|
||||||
/// TODO key_columns have low cardinality removed but res_key_columns not
|
|
||||||
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
|
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,7 +44,7 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
|
|||||||
|
|
||||||
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
|
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
|
||||||
|
|
||||||
static bool less(const MutableColumns & lhs, const ColumnRawPtrs & rhs, size_t i, size_t j, const SortDescription & descr)
|
static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
|
||||||
{
|
{
|
||||||
for (const auto & elem : descr)
|
for (const auto & elem : descr)
|
||||||
{
|
{
|
||||||
@ -84,9 +81,16 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
|
|||||||
/// So that key_columns could live longer xD
|
/// So that key_columns could live longer xD
|
||||||
/// Need a better construction probably
|
/// Need a better construction probably
|
||||||
Columns materialized_columns;
|
Columns materialized_columns;
|
||||||
Aggregator::AggregateFunctionInstructions aggregate_function_instructions;
|
|
||||||
|
|
||||||
params->aggregator.prepareKeysAndInstructions(chunk.detachColumns(), variants, key_columns, aggregate_columns, materialized_columns, aggregate_function_instructions);
|
Columns key_columns(params->params.keys_size);
|
||||||
|
for (size_t i = 0; i < params->params.keys_size; ++i)
|
||||||
|
{
|
||||||
|
materialized_columns.push_back(chunk.getColumns().at(params->params.keys[i])->convertToFullColumnIfConst());
|
||||||
|
key_columns[i] = materialized_columns.back();
|
||||||
|
}
|
||||||
|
|
||||||
|
Aggregator::AggregateFunctionInstructions aggregate_function_instructions;
|
||||||
|
params->aggregator.prepareAggregateInstructions(chunk.detachColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions);
|
||||||
|
|
||||||
// std::cerr << "\nPrepared block of size " << rows << "\n";
|
// std::cerr << "\nPrepared block of size " << rows << "\n";
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ private:
|
|||||||
// size_t sz = 0;
|
// size_t sz = 0;
|
||||||
|
|
||||||
size_t res_block_size{};
|
size_t res_block_size{};
|
||||||
|
|
||||||
MutableColumns res_key_columns;
|
MutableColumns res_key_columns;
|
||||||
MutableColumns res_aggregate_columns;
|
MutableColumns res_aggregate_columns;
|
||||||
|
|
||||||
@ -38,7 +39,6 @@ private:
|
|||||||
SortDescription sort_description;
|
SortDescription sort_description;
|
||||||
SortDescription group_by_description;
|
SortDescription group_by_description;
|
||||||
|
|
||||||
ColumnRawPtrs key_columns;
|
|
||||||
Aggregator::AggregateColumns aggregate_columns;
|
Aggregator::AggregateColumns aggregate_columns;
|
||||||
|
|
||||||
ManyAggregatedDataPtr many_data;
|
ManyAggregatedDataPtr many_data;
|
||||||
|
Loading…
Reference in New Issue
Block a user