Try to merge projectons faster.

This commit is contained in:
Nikolai Kochetov 2021-04-21 19:00:27 +03:00 committed by Amos Bird
parent cd6414639e
commit 3296c9292f
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
12 changed files with 281 additions and 50 deletions

View File

@ -480,7 +480,7 @@ void RemoteQueryExecutor::sendExternalTables()
SelectQueryInfo query_info;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
context, QueryProcessingStage::Complete, metadata_snapshot, query_info);
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),

View File

@ -1902,6 +1902,87 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
block.clear();
}
bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys)
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
}
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeWithoutKeyStreamsImpl(block, result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
/// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;
bool worth_convert_to_two_level
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
/** Converting to a two-level data structure.
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
*/
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
result.convertToTwoLevel();
/// Checking the constraints.
if (!checkLimits(result_size, no_more_keys))
return false;
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
*/
if (params.max_bytes_before_external_group_by
&& result.isTwoLevel()
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
std::string tmp_path = params.tmp_volume->getDisk()->getPath();
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
//
// But true reservation (IVolume::reserve()) cannot be used here since
// current_memory_usage does not takes compression into account and
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (!enoughSpaceInDirectory(tmp_path, size))
throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
writeToTemporaryFile(result, tmp_path);
}
return true;
}
void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads)
{

View File

@ -983,6 +983,8 @@ public:
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
bool mergeBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys);
/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).

View File

@ -282,6 +282,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
checkStackSize();
query_info.ignore_projections = options.ignore_projections;
initSettings();
const Settings & settings = context->getSettingsRef();
@ -393,18 +395,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
view = nullptr;
}
bool use_projection = false;
if (storage && !options.only_analyze)
{
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
{
if (syntax_analyzer_result->can_use_projection)
use_projection = merge_tree->getQueryProcessingStageWithAggregateProjection(
context, options, query_ptr, metadata_snapshot, query_info);
}
}
if (!use_projection && try_move_to_prewhere && storage && query.where() && !query.prewhere())
if (try_move_to_prewhere && storage && query.where() && !query.prewhere())
{
/// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable
if (const auto & column_sizes = storage->getColumnSizes(); !column_sizes.empty())
@ -428,9 +419,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
if (use_projection)
// ugly but works
metadata_snapshot->selected_projection = query_info.aggregate_projection;
// if (use_projection)
// // ugly but works
// metadata_snapshot->selected_projection = query_info.aggregate_projection;
// TODO In expression analyzer we need to check if storage mayBenefitFromIndexForIn. In case
// we use some projection to execute the query, we need to provide what projection we are
@ -566,7 +557,7 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
executeImpl(query_plan, input, std::move(input_pipe));
/// We must guarantee that result structure is the same as in getSampleBlock()
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
if (!options.ignore_projections && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
@ -600,10 +591,10 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (storage && !options.only_analyze)
{
if (query_info.aggregate_projection)
from_stage = QueryProcessingStage::WithMergeableState;
else
from_stage = storage->getQueryProcessingStage(context, options.to_stage, query_info);
from_stage = storage->getQueryProcessingStage(context, options.to_stage, metadata_snapshot, query_info);
/// XXX Used for IN set index analysis. Is this a proper way?
metadata_snapshot->selected_projection = query_info.aggregate_projection;
}
/// Do I need to perform the first part of the pipeline?
@ -1941,6 +1932,9 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
if (options.ignore_projections)
return;
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
@ -1996,6 +1990,9 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
{
const auto & header_before_merge = query_plan.getCurrentDataStream().header;
if (query_info.aggregate_projection)
return;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_merge.getPositionByName(key.name));

View File

@ -91,6 +91,8 @@ public:
const SelectQueryInfo & getQueryInfo() const { return query_info; }
const SelectQueryExpressionAnalyzer * getQueryAnalyzer() const { return query_analyzer.get(); }
const Names & getRequiredColumns() const { return required_columns; }
bool hasAggregation() const { return query_analyzer->hasAggregation(); }

View File

@ -522,8 +522,18 @@ void AggregatingTransform::consume(Chunk chunk)
src_rows += num_rows;
src_bytes += chunk.bytes();
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
if (params->only_merge)
{
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
if (!params->aggregator.mergeBlock(block, variants, no_more_keys))
is_consume_finished = true;
}
else
{
if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys))
is_consume_finished = true;
}
}
void AggregatingTransform::initGenerate()

View File

@ -32,6 +32,7 @@ struct AggregatingTransformParams
Aggregator::Params params;
Aggregator aggregator;
bool final;
bool only_merge = false;
AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_) {}

View File

@ -3804,15 +3804,15 @@ bool MergeTreeData::mayBenefitFromIndexForIn(
bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context,
const SelectQueryOptions & option,
const ASTPtr & query_ptr,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info) const
SelectQueryInfo & query_info)
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || option.ignore_projections)
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
return false;
const auto & query_ptr = query_info.query;
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
auto query_block = select.getSampleBlock();
@ -3884,6 +3884,8 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
{
query_info.projection_names = projection_condition.getRequiredColumns();
query_info.projection_block = query_block;
query_info.aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
query_info.aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
}
}
@ -3907,6 +3909,22 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
}
QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info) const
{
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
{
if (getQueryProcessingStageWithAggregateProjection(query_context, metadata_snapshot, query_info))
return QueryProcessingStage::Enum::WithMergeableState;
}
return QueryProcessingStage::Enum::FetchColumns;
}
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const
{
MergeTreeData * src_data = dynamic_cast<MergeTreeData *>(&source_table);

View File

@ -358,12 +358,16 @@ public:
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
bool getQueryProcessingStageWithAggregateProjection(
static bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context,
const SelectQueryOptions & option,
const ASTPtr & query_ptr,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info) const;
SelectQueryInfo & query_info);
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & info) const override;
ReservationPtr reserveSpace(UInt64 expected_size, VolumePtr & volume) const;

View File

@ -196,6 +196,12 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
rows_without_projection,
rows_with_projection);
std::cerr << "========== Normal parts " << normal_parts.size() << std::endl;
std::cerr << "========== Projec parts " << projection_parts.size() << std::endl;
Pipe projection_pipe;
Pipe ordinary_pipe;
const auto & given_select = query_info.query->as<const ASTSelectQuery &>();
if (!projection_parts.empty())
{
@ -209,10 +215,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
num_streams,
max_block_numbers_to_read);
auto pipe = plan
? plan->convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context))
: Pipe();
if (!pipe.empty())
if (plan)
projection_pipe = plan->convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
if (!projection_pipe.empty())
{
// If `key_actions` is not empty, transform input blocks by adding needed columns
// originated from key columns. We already project the block at the end, using
@ -229,7 +235,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto syntax_result = TreeRewriter(context).analyze(expr, columns);
auto expression = ExpressionAnalyzer(expr, syntax_result, context).getActions(false);
pipe.addSimpleTransform([&expression](const Block & header)
projection_pipe.addSimpleTransform([&expression](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, expression);
});
@ -238,7 +244,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
/// In sample block we use just key columns
if (given_select.where())
{
Block filter_block = pipe.getHeader(); // we can use the previous pipeline's sample block here
Block filter_block = projection_pipe.getHeader(); // we can use the previous pipeline's sample block here
ASTPtr where = given_select.where()->clone();
ProjectionCondition projection_condition(filter_block.getNames(), {});
@ -247,7 +253,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto syntax_result = TreeRewriter(context).analyze(where, filter_block.getNamesAndTypesList());
const auto actions = ExpressionAnalyzer(where, syntax_result, context).getActions(false);
pipe.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType)
projection_pipe.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType)
{
return std::make_shared<FilterTransform>(header, actions, where_column_name, true);
});
@ -255,12 +261,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
// Project columns and set bucket number to -1
// optionally holds the reference of parent parts
pipe.addSimpleTransform([&](const Block & header)
projection_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ProjectionPartTransform>(header, query_info.projection_block, std::move(parent_parts));
});
pipes.push_back(std::move(pipe));
}
}
@ -272,23 +276,131 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (given_select.where())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
// After overriding the group by clause, we finish the possible aggregations directly
if (given_select.groupBy())
if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy())
select.setExpression(ASTSelectQuery::Expression::GROUP_BY, given_select.groupBy()->clone());
auto interpreter = InterpreterSelectQuery(ast, context, storage_from_source_part, nullptr, {processed_stage});
auto pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
auto interpreter = InterpreterSelectQuery(ast, context, storage_from_source_part, nullptr, SelectQueryOptions{processed_stage}.ignoreProjections());
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
if (!pipe.empty())
std::cerr << "========= Ord pipe size " << ordinary_pipe.numOutputPorts() << std::endl;
if (!ordinary_pipe.empty() && processed_stage < QueryProcessingStage::Enum::WithMergeableState)
{
// projection and set bucket number to -1
pipe.addSimpleTransform([&](const Block & header)
ordinary_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ProjectionPartTransform>(header, query_info.projection_block);
});
pipes.push_back(std::move(pipe));
}
}
auto step = std::make_unique<ReadFromStorageStep>(Pipe::unitePipes(std::move(pipes)), "MergeTree(with projection)");
if (processed_stage >= QueryProcessingStage::WithMergeableState)
{
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;
bool overflow_row =
given_select.group_by_with_totals &&
settings.max_rows_to_group_by &&
settings.group_by_overflow_mode == OverflowMode::ANY &&
settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
if (!projection_pipe.empty())
{
const auto & header_before_merge = projection_pipe.getHeader();
std::cerr << "============ header_before_merge\n";
std::cerr << header_before_merge.dumpStructure() << std::endl;
ColumnNumbers keys;
for (const auto & key : query_info.aggregation_keys)
keys.push_back(header_before_merge.getPositionByName(key.name));
AggregateDescriptions aggregates = query_info.aggregate_descriptions;
// for (auto & descr : aggregates)
// if (descr.arguments.empty())
// for (const auto & name : descr.argument_names)
// descr.arguments.push_back(header_before_merge.getPositionByName(name));
/// Aggregator::Params params(header_before_merge, keys, query_info.aggregate_descriptions, overflow_row, settings.max_threads);
Aggregator::Params params(header_before_merge, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
//params.intermediate_header = header_before_merge;
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), /*final*/ true);
transform_params->only_merge = true;
projection_pipe.resize(projection_pipe.numOutputPorts(), true, true);
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
projection_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
std::cerr << "========== header after merge " << projection_pipe.getHeader().dumpStructure() << std::endl;
}
if (!ordinary_pipe.empty())
{
const auto & header_before_aggregation = ordinary_pipe.getHeader();
std::cerr << "============ header_before_aggregation\n";
std::cerr << header_before_aggregation.dumpStructure() << std::endl;
ColumnNumbers keys;
for (const auto & key : query_info.aggregation_keys)
keys.push_back(header_before_aggregation.getPositionByName(key.name));
AggregateDescriptions aggregates = query_info.aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
Aggregator::Params params(header_before_aggregation, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), /*final*/ true);
ordinary_pipe.resize(ordinary_pipe.numOutputPorts(), true, true);
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
ordinary_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
std::cerr << "============ header after aggregation\n";
std::cerr << ordinary_pipe.getHeader().dumpStructure() << std::endl;
}
}
pipes.emplace_back(std::move(projection_pipe));
pipes.emplace_back(std::move(ordinary_pipe));
auto pipe = Pipe::unitePipes(std::move(pipes));
pipe.resize(1);
auto step = std::make_unique<ReadFromStorageStep>(std::move(pipe), "MergeTree(with projection)");
auto plan = std::make_unique<QueryPlan>();
plan->addStep(std::move(step));
return plan;

View File

@ -6,6 +6,7 @@
#include <Core/Names.h>
#include <Storages/ProjectionsDescription.h>
#include <Storages/MergeTree/ProjectionKeyActions.h>
#include <Interpreters/AggregateDescription.h>
#include <memory>
@ -151,8 +152,11 @@ struct SelectQueryInfo
ProjectionKeyActions key_actions;
Names projection_names;
Block projection_block;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
bool ignore_projections = false;
/// Store to-be-scanned data parts if some aggregate projection is used
/// TODO Store to-be-scanned data parts if some aggregate projection is used
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;

View File

@ -358,7 +358,7 @@ Pipe StorageMerge::createSources(
}
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, false, metadata_snapshot, modified_query_info);
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, metadata_snapshot, modified_query_info);
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.