Merge pull request #28700 from amosbird/projection-fix16

Fix crash on exception with projection aggregate
This commit is contained in:
Nikolai Kochetov 2021-09-09 15:24:05 +03:00 committed by GitHub
commit 13eb93a9c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 81 additions and 35 deletions

View File

@ -786,7 +786,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
Arena * arena) const
{
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
@ -865,7 +865,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
@ -917,7 +917,7 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
UInt64 num_rows = block.rows();
return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
@ -925,7 +925,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -1058,7 +1058,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const
{
Stopwatch watch;
size_t rows = data_variants.size();
@ -1130,7 +1130,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const
{
String tmp_path = params.tmp_volume->getDisk()->getPath();
return writeToTemporaryFile(data_variants, tmp_path);
@ -1192,7 +1192,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out)
IBlockOutputStream & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
@ -2311,7 +2311,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
block.clear();
}
bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys)
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
@ -2661,7 +2661,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
}
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block) const
{
if (!block)
return {};
@ -2753,7 +2753,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
}
void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) const
{
if (result.empty())
return;

View File

@ -506,7 +506,7 @@ struct AggregatedDataVariants : private boost::noncopyable
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
* In this case, the pool will not be able to know with what offsets objects are stored.
*/
Aggregator * aggregator = nullptr;
const Aggregator * aggregator = nullptr;
size_t keys_size{}; /// Number of keys. NOTE do we need this field?
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
@ -975,11 +975,14 @@ public:
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;
bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool & no_more_keys) const;
/// Used for aggregate projection.
bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const;
/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
@ -996,8 +999,6 @@ public:
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
bool mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys);
/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
@ -1007,11 +1008,11 @@ public:
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
@ -1083,7 +1084,7 @@ private:
Poco::Logger * log = &Poco::Logger::get("Aggregator");
/// For external aggregation.
TemporaryFiles temporary_files;
mutable TemporaryFiles temporary_files;
#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
@ -1106,7 +1107,7 @@ private:
/** Call `destroy` methods for states of aggregate functions.
* Used in the exception handler for aggregation, since RAII in this case is not applicable.
*/
void destroyAllAggregateStates(AggregatedDataVariants & result);
void destroyAllAggregateStates(AggregatedDataVariants & result) const;
/// Process one data block, aggregate the data into a hash table.
@ -1136,7 +1137,7 @@ private:
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena);
Arena * arena) const;
static void executeOnIntervalWithoutKeyImpl(
AggregatedDataWithoutKey & res,
@ -1149,7 +1150,7 @@ private:
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out);
IBlockOutputStream & out) const;
/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>
@ -1304,7 +1305,7 @@ private:
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
NestedColumnsHolder & nested_columns_holder) const;
void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,

View File

@ -395,9 +395,14 @@ AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformPar
}
AggregatingTransform::AggregatingTransform(
Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_,
size_t current_variant, size_t max_threads_, size_t temporary_data_merge_threads_)
: IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_))
Block header,
AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data_,
size_t current_variant,
size_t max_threads_,
size_t temporary_data_merge_threads_)
: IProcessor({std::move(header)}, {params_->getHeader()})
, params(std::move(params_))
, key_columns(params->params.keys_size)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
@ -525,7 +530,7 @@ void AggregatingTransform::consume(Chunk chunk)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
block = materializeBlock(block);
if (!params->aggregator.mergeBlock(block, variants, no_more_keys))
if (!params->aggregator.mergeOnBlock(block, variants, no_more_keys))
is_consume_finished = true;
}
else
@ -547,7 +552,7 @@ void AggregatingTransform::initGenerate()
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)
{
if (params->only_merge)
params->aggregator.mergeBlock(getInputs().front().getHeader(), variants, no_more_keys);
params->aggregator.mergeOnBlock(getInputs().front().getHeader(), variants, no_more_keys);
else
params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys);
}

View File

@ -27,15 +27,38 @@ public:
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using AggregatorList = std::list<Aggregator>;
using AggregatorListPtr = std::shared_ptr<AggregatorList>;
struct AggregatingTransformParams
{
Aggregator::Params params;
Aggregator aggregator;
/// Each params holds a list of aggregators which are used in query. It's needed because we need
/// to use a pointer of aggregator to proper destroy complex aggregation states on exception
/// (See comments in AggregatedDataVariants). However, this pointer might not be valid because
/// we can have two different aggregators at the same time due to mixed pipeline of aggregate
/// projections, and one of them might gets destroyed before used.
AggregatorListPtr aggregator_list_ptr;
Aggregator & aggregator;
bool final;
bool only_merge = false;
AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_) {}
: params(params_)
, aggregator_list_ptr(std::make_shared<AggregatorList>())
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
{
}
AggregatingTransformParams(const Aggregator::Params & params_, const AggregatorListPtr & aggregator_list_ptr_, bool final_)
: params(params_)
, aggregator_list_ptr(aggregator_list_ptr_)
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
{
}
Block getHeader() const { return aggregator.getHeader(final); }
@ -82,9 +105,13 @@ public:
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_);
/// For Parallel aggregating.
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data, size_t current_variant,
size_t max_threads, size_t temporary_data_merge_threads);
AggregatingTransform(
Block header,
AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data,
size_t current_variant,
size_t max_threads,
size_t temporary_data_merge_threads);
~AggregatingTransform() override;
String getName() const override { return "AggregatingTransform"; }

View File

@ -267,6 +267,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;
AggregatorListPtr aggregator_list_ptr = std::make_shared<AggregatorList>();
// TODO apply in_order_optimization here
auto build_aggregate_pipe = [&](Pipe & pipe, bool projection)
{
@ -306,7 +308,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_count_to_compile_aggregate_expression,
header_before_aggregation); // The source header is also an intermediate header
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final);
/// This part is hacky.
/// We want AggregatingTransform to work with aggregate states instead of normal columns.
@ -336,7 +339,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final);
}
pipe.resize(pipe.numOutputPorts(), true, true);

View File

@ -0,0 +1,9 @@
drop table if exists t;
create table t (x UInt32) engine = MergeTree order by tuple() settings index_granularity = 8;
insert into t select number from numbers(100);
alter table t add projection p (select uniqHLL12(x));
insert into t select number + 100 from numbers(100);
select uniqHLL12(x) from t settings allow_experimental_projection_optimization = 1, max_bytes_to_read=400, max_block_size=8; -- { serverError 307; }
drop table if exists t;