Reformat and fix some tests

This commit is contained in:
Amos Bird 2021-04-25 10:26:36 +08:00
parent 233e8bc927
commit ebaf42a448
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
23 changed files with 198 additions and 160 deletions

View File

@ -1928,8 +1928,6 @@ bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool &
else if (result.type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())

View File

@ -396,7 +396,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
// TODO Check if we can have prewhere work for projections, also need to allow it in TreeRewriter
if (try_move_to_prewhere && storage && query.where() && !query.prewhere() && !query.final())
if (try_move_to_prewhere && storage && query.where() && !query.prewhere())
{
/// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable
if (const auto & column_sizes = storage->getColumnSizes(); !column_sizes.empty())
@ -590,7 +590,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
from_stage = storage->getQueryProcessingStage(context, options.to_stage, metadata_snapshot, query_info);
/// XXX Used for IN set index analysis. Is this a proper way?
metadata_snapshot->selected_projection = query_info.aggregate_projection;
metadata_snapshot->selected_projection = query_info.projection;
}
/// Do I need to perform the first part of the pipeline?
@ -1822,7 +1822,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order) && !query_info.aggregate_projection)
if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order) && !query_info.projection)
{
if (analysis_result.optimize_read_in_order)
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
@ -1864,7 +1864,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
backQuoteIfNeed(local_storage_id.getDatabaseName()),
local_storage_id.getFullTableName(),
required_columns,
query_info.aggregate_projection ? query_info.aggregate_projection->name : "");
query_info.projection ? query_info.projection->name : "");
}
/// Create step which reads from empty source if storage has no data.
@ -1988,7 +1988,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
/// It is already added by storage (because of performance issues).
/// TODO: We should probably add another one processing stage for storage?
/// WithMergeableStateAfterAggregation is not ok because, e.g., it skips sorting after aggregation.
if (query_info.aggregate_projection)
if (query_info.projection && query_info.projection->type == "aggregate")
return;
const auto & header_before_merge = query_plan.getCurrentDataStream().header;

View File

@ -951,12 +951,6 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
!select_query->sampleSize() && !select_query->sampleOffset() && !select_query->final() &&
(tables_with_columns.size() < 2 || isLeft(result.analyzed_join->kind()));
// TODO Perhaps we can support distinct or other group by variants for projections
result.can_use_projection = !result.optimize_trivial_count && settings.allow_experimental_projection_optimization
&& !select_query->prewhere() && !select_query->sampleSize() && !select_query->sampleOffset() && !select_query->final()
&& (tables_with_columns.size() < 2) && !select_query->distinct && !select_query->group_by_with_totals
&& !select_query->group_by_with_rollup && !select_query->group_by_with_cube;
return std::make_shared<const TreeRewriterResult>(result);
}

View File

@ -62,8 +62,6 @@ struct TreeRewriterResult
/// instead of actual retrieving columns and counting rows.
bool optimize_trivial_count = false;
bool can_use_projection = false;
/// Cache isRemote() call for storage, because it may be too heavy.
bool is_remote_storage = false;

View File

@ -19,7 +19,7 @@ ReadFromMergeTree::ReadFromMergeTree(
RangesInDataParts parts_,
IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
@ -36,7 +36,7 @@ ReadFromMergeTree::ReadFromMergeTree(
, parts(std::move(parts_))
, index_stats(std::move(index_stats_))
, prewhere_info(std::move(prewhere_info_))
, aggregate_projection(aggregate_projection_)
, projection(projection_)
, virt_column_names(std::move(virt_column_names_))
, settings(std::move(settings_))
, num_streams(num_streams_)
@ -79,7 +79,7 @@ Pipe ReadFromMergeTree::readFromPool()
i, pool, settings.min_marks_for_concurrent_read, settings.max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
storage, metadata_snapshot, settings.use_uncompressed_cache,
prewhere_info, aggregate_projection, settings.reader_settings, virt_column_names);
prewhere_info, projection, settings.reader_settings, virt_column_names);
if (i == 0)
{
@ -99,7 +99,7 @@ ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part)
return std::make_shared<TSource>(
storage, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, settings.use_uncompressed_cache,
prewhere_info, aggregate_projection, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
prewhere_info, projection, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
}
Pipe ReadFromMergeTree::readInOrder()

View File

@ -74,7 +74,7 @@ public:
RangesInDataParts parts_,
IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
@ -100,7 +100,7 @@ private:
RangesInDataParts parts;
IndexStatPtr index_stats;
PrewhereInfoPtr prewhere_info;
const ProjectionDescription * aggregate_projection;
const ProjectionDescription * projection;
Names virt_column_names;
Settings settings;

View File

@ -105,7 +105,7 @@ void IStorage::read(
auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (pipe.empty())
{
auto header = (query_info.aggregate_projection ? query_info.aggregate_projection->metadata : metadata_snapshot)
auto header = (query_info.projection ? query_info.projection->metadata : metadata_snapshot)
->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
}

View File

@ -26,7 +26,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -37,7 +37,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
, aggregate_projection(aggregate_projection_)
, projection(projection_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
@ -66,7 +66,7 @@ Chunk MergeTreeBaseSelectProcessor::generate()
if (res.hasRows())
{
injectVirtualColumns(res, task.get(), partition_value_type, virt_column_names);
if (aggregate_projection)
if (projection)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = -1;

View File

@ -24,7 +24,7 @@ public:
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -62,7 +62,7 @@ protected:
StorageMetadataPtr metadata_snapshot;
PrewhereInfoPtr prewhere_info;
const ProjectionDescription * aggregate_projection;
const ProjectionDescription * projection;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -16,9 +16,7 @@
#include <IO/Operators.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/inplaceBlockConversions.h>
@ -3816,12 +3814,26 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult();
/// If first staging query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage, return false
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
/// we cannot use aggregate projection.
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr)
return false;
can_use_aggregate_projection = false;
auto query_block = select.getSampleBlock();
auto required_query = select.getQuery();
const auto & required_query = select.getQuery()->as<const ASTSelectQuery &>();
auto required_predicates = [&required_query]() -> ASTPtr
{
if (required_query.prewhere() && required_query.where())
return makeASTFunction("and", required_query.prewhere()->clone(), required_query.where()->clone());
else if (required_query.prewhere())
return required_query.prewhere()->clone();
else if (required_query.where())
return required_query.where()->clone();
else
return nullptr;
}();
/// Check if all needed columns can be provided by some aggregate projection. Here we also try
/// to find expression matches. For example, suppose an aggregate projection contains a column
@ -3833,11 +3845,18 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
/// The ownership of ProjectionDescription is hold in metadata_snapshot which lives along with
/// InterpreterSelect, thus we can store the raw pointer here.
std::vector<std::pair<const ProjectionDescription *, ProjectionKeyActions>> candidates;
struct ProjectionCandidate
{
const ProjectionDescription * desc;
ProjectionKeyActions key_actions;
Names required_columns;
NameSet required_columns_in_predicate;
};
std::vector<ProjectionCandidate> candidates;
ParserFunction parse_function;
for (const auto & projection : metadata_snapshot->projections)
{
if (projection.type == "aggregate" && !analysis_result.need_aggregate)
if (projection.type == "aggregate" && (!analysis_result.need_aggregate || !can_use_aggregate_projection))
continue;
if (projection.type == "normal" && !(analysis_result.hasWhere() || analysis_result.hasPrewhere()))
@ -3898,18 +3917,13 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
}
ProjectionCondition projection_condition(projection.column_names, required_column_names);
const auto & where = query_ptr->as<const ASTSelectQuery &>().where();
if (where && !projection_condition.check(where))
if (required_predicates && !projection_condition.check(required_predicates))
continue;
candidates.push_back({&projection, std::move(key_actions)});
// A candidate is found, setup needed info but only once.
if (query_info.projection_names.empty())
{
query_info.projection_names = projection_condition.getRequiredColumns();
query_info.projection_block = query_block;
query_info.aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
query_info.aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
candidates.push_back(
{&projection,
std::move(key_actions),
projection_condition.getRequiredColumns(),
projection_condition.getRequiredColumnsInPredicate()});
}
}
@ -3917,16 +3931,54 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
if (!candidates.empty())
{
size_t min_key_size = std::numeric_limits<size_t>::max();
for (auto & [candidate, key_actions] : candidates)
const ProjectionCandidate * selected_candidate = nullptr;
/// Favor aggregate projections
for (const auto & candidate : candidates)
{
// TODO We choose the projection with least key_size. Perhaps we can do better? (key rollups)
if (candidate->key_size < min_key_size)
if (candidate.desc->type == "aggregate" && candidate.desc->key_size < min_key_size)
{
query_info.aggregate_projection = candidate;
query_info.key_actions = std::move(key_actions);
min_key_size = candidate->key_size;
selected_candidate = &candidate;
min_key_size = candidate.desc->key_size;
}
}
/// Select the best normal projection if no aggregate projection is available
if (!selected_candidate)
{
size_t max_num_of_matched_key_columns = 0;
for (const auto & candidate : candidates)
{
NameSet column_names(
candidate.desc->metadata->sorting_key.column_names.begin(), candidate.desc->metadata->sorting_key.column_names.end());
size_t num_of_matched_key_columns = 0;
for (const auto & name : candidate.desc->metadata->sorting_key.column_names)
{
if (candidate.required_columns_in_predicate.find(name) != candidate.required_columns_in_predicate.end())
++num_of_matched_key_columns;
}
/// Select the normal projection that has the most matched key columns in predicate
/// TODO What's the best strategy here?
if (num_of_matched_key_columns > max_num_of_matched_key_columns)
{
selected_candidate = &candidate;
max_num_of_matched_key_columns = num_of_matched_key_columns;
}
}
}
if (!selected_candidate)
throw Exception("None of the projection candidates is selected", ErrorCodes::LOGICAL_ERROR);
query_info.projection = selected_candidate->desc;
query_info.key_actions = std::move(selected_candidate->key_actions);
query_info.projection_names = std::move(selected_candidate->required_columns);
query_info.projection_block = query_block;
query_info.aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
query_info.aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
return true;
}
return false;
@ -3943,7 +3995,7 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
{
if (getQueryProcessingStageWithAggregateProjection(query_context, metadata_snapshot, query_info))
{
if (query_info.aggregate_projection->type == "aggregate")
if (query_info.projection->type == "aggregate")
return QueryProcessingStage::Enum::WithMergeableState;
}
}

View File

@ -142,7 +142,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
const auto & settings = context->getSettingsRef();
if (query_info.aggregate_projection == nullptr)
if (query_info.projection == nullptr)
{
if (settings.allow_experimental_projection_optimization && settings.force_optimize_projection
&& !metadata_snapshot->projections.empty())
@ -165,7 +165,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
size_t no_projection_granules = 0;
size_t with_projection_granules = 0;
if (query_info.aggregate_projection->type == "normal")
if (query_info.projection->type == "normal")
plan_no_projections = readFromParts(
data.getDataPartsVector(),
column_names_to_return,
@ -177,7 +177,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
max_block_numbers_to_read,
&no_projection_granules);
LOG_DEBUG(log, "Choose projection {}", query_info.aggregate_projection->name);
LOG_DEBUG(log, "Choose projection {}", query_info.projection->name);
Pipes pipes;
auto parts = data.getDataPartsVector();
@ -187,7 +187,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
for (auto & part : parts)
{
const auto & projections = part->getProjectionParts();
auto it = projections.find(query_info.aggregate_projection->name);
auto it = projections.find(query_info.projection->name);
if (it != projections.end())
{
projection_parts.push_back(it->second);
@ -210,45 +210,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
throw Exception(
ErrorCodes::PROJECTION_NOT_USED,
"No projection is used because there are more than 50% rows don't have projection {}. {} normal rows, {} projection rows",
query_info.aggregate_projection->name,
query_info.projection->name,
rows_without_projection,
rows_with_projection);
// std::cerr << "========== Normal parts " << normal_parts.size() << std::endl;
// std::cerr << "========== Projec parts " << projection_parts.size() << std::endl;
Pipe projection_pipe;
Pipe ordinary_pipe;
const auto & given_select = query_info.query->as<const ASTSelectQuery &>();
if (!projection_parts.empty())
{
auto projection_names = query_info.projection_names;
if (query_info.prewhere_info)
{
const auto prewhere_columns =
(query_info.prewhere_info->alias_actions ?
query_info.prewhere_info->alias_actions :
(query_info.prewhere_info->row_level_filter ?
query_info.prewhere_info->row_level_filter :
query_info.prewhere_info->prewhere_actions))->getRequiredColumns();
NameSet added_column(projection_names.begin(), projection_names.end());
for (const auto & col : prewhere_columns)
{
if (added_column.count(col) == 0)
{
added_column.insert(col);
projection_names.push_back(col);
}
}
}
auto plan = readFromParts(
std::move(projection_parts),
projection_names, // raw columns without key transformation
query_info.aggregate_projection->metadata,
query_info.projection_names, // raw columns without key transformation
query_info.projection->metadata,
query_info,
context,
max_block_size,
@ -258,7 +233,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
true);
if (plan)
projection_pipe = plan->convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
projection_pipe = plan->convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
if (!projection_pipe.empty())
{
@ -277,16 +253,15 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto syntax_result = TreeRewriter(context).analyze(expr, columns);
auto expression = ExpressionAnalyzer(expr, syntax_result, context).getActions(false);
projection_pipe.addSimpleTransform([&expression](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, expression);
});
projection_pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
/// In sample block we use just key columns
if (given_select.where())
{
Block filter_block = projection_pipe.getHeader(); // we can use the previous pipeline's sample block here
// we can use the previous pipeline's sample block here
Block filter_block = projection_pipe.getHeader();
ASTPtr where = given_select.where()->clone();
ProjectionCondition projection_condition(filter_block.getNames(), {});
@ -313,14 +288,16 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!normal_parts.empty())
{
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(std::move(normal_parts));
auto ast = query_info.aggregate_projection->query_ast->clone();
auto ast = query_info.projection->query_ast->clone();
auto & select = ast->as<ASTSelectQuery &>();
if (given_select.where())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
// After overriding the group by clause, we finish the possible aggregations directly
if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy())
select.setExpression(ASTSelectQuery::Expression::GROUP_BY, given_select.groupBy()->clone());
auto interpreter = InterpreterSelectQuery(ast, context, storage_from_source_part, nullptr, SelectQueryOptions{processed_stage}.ignoreAggregation());
auto interpreter = InterpreterSelectQuery(
ast, context, storage_from_source_part, nullptr, SelectQueryOptions{processed_stage}.ignoreAggregation());
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
with_projection_granules += storage_from_source_part->getNumGranulesFromLastRead();
@ -328,19 +305,17 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!ordinary_pipe.empty() && processed_stage < QueryProcessingStage::Enum::WithMergeableState)
{
// projection and set bucket number to -1
ordinary_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ProjectionPartTransform>(header, query_info.projection_block);
});
ordinary_pipe.addSimpleTransform(
[&](const Block & header) { return std::make_shared<ProjectionPartTransform>(header, query_info.projection_block); });
}
}
/// Use normal projection only if we read less granules then without it.
/// TODO: check if read-in-order optimization possible for normal projection.
if (query_info.aggregate_projection->type == "normal" && with_projection_granules > no_projection_granules)
if (query_info.projection->type == "normal" && with_projection_granules > no_projection_granules)
return plan_no_projections;
if (query_info.aggregate_projection->type == "aggregate")
if (query_info.projection->type == "aggregate")
{
/// Here we create shared ManyAggregatedData for both projection and ordinary data.
/// For ordinary data, AggregatedData is filled in a usual way.
@ -356,17 +331,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;
bool overflow_row =
given_select.group_by_with_totals &&
settings.max_rows_to_group_by &&
settings.group_by_overflow_mode == OverflowMode::ANY &&
settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
bool overflow_row = given_select.group_by_with_totals && settings.max_rows_to_group_by
&& settings.group_by_overflow_mode == OverflowMode::ANY && settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
if (!projection_pipe.empty())
{
const auto & header_before_merge = projection_pipe.getHeader();
// std::cerr << "============ header before merge" << std::endl;
// std::cerr << header_before_merge.dumpStructure() << std::endl;
std::cerr << "============ header before merge" << std::endl;
std::cerr << header_before_merge.dumpStructure() << std::endl;
ColumnNumbers keys;
for (const auto & key : query_info.aggregation_keys)
keys.push_back(header_before_merge.getPositionByName(key.name));
@ -376,15 +348,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
AggregateDescriptions aggregates = query_info.aggregate_descriptions;
/// Aggregator::Params params(header_before_merge, keys, query_info.aggregate_descriptions, overflow_row, settings.max_threads);
Aggregator::Params params(header_before_merge, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
Aggregator::Params params(
header_before_merge,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), /*final*/ true);
/// This part is hacky.
@ -393,31 +370,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
///
/// It is needed because data in projection:
/// * is not merged completely (we may have states with the same key in different parts)
/// * is not splitted into buckets (so if we just use MergingAggregated, it will use single thread)
/// * is not split into buckets (so if we just use MergingAggregated, it will use single thread)
transform_params->only_merge = true;
projection_pipe.resize(projection_pipe.numOutputPorts(), true, true);
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
projection_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
return std::make_shared<AggregatingTransform>(
header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
// std::cerr << "========== header after merge" << std::endl;
// std::cerr << projection_pipe.getHeader().dumpStructure() << std::endl;
std::cerr << "========== header after merge" << std::endl;
std::cerr << projection_pipe.getHeader().dumpStructure() << std::endl;
}
if (!ordinary_pipe.empty())
{
const auto & header_before_aggregation = ordinary_pipe.getHeader();
// std::cerr << "============ header before aggregation" << std::endl;
// std::cerr << header_before_aggregation.dumpStructure() << std::endl;
std::cerr << "============ header before aggregation" << std::endl;
std::cerr << header_before_aggregation.dumpStructure() << std::endl;
ColumnNumbers keys;
for (const auto & key : query_info.aggregation_keys)
@ -429,15 +407,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
Aggregator::Params params(header_before_aggregation, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), /*final*/ true);
@ -445,16 +428,17 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
ordinary_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
return std::make_shared<AggregatingTransform>(
header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
// std::cerr << "============ header after aggregation" << std::endl;
// std::cerr << ordinary_pipe.getHeader().dumpStructure() << std::endl;
std::cerr << "============ header after aggregation" << std::endl;
std::cerr << ordinary_pipe.getHeader().dumpStructure() << std::endl;
}
}
@ -478,11 +462,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
const UInt64 max_block_size,
const unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
size_t * num_granules_are_to_read,
size_t * num_granules_to_read,
bool use_projection_metadata) const
{
const StorageMetadataPtr & metadata_snapshot
= (query_info.aggregate_projection && use_projection_metadata) ? query_info.aggregate_projection->metadata : metadata_snapshot_base;
= (query_info.projection && use_projection_metadata) ? query_info.projection->metadata : metadata_snapshot_base;
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
@ -1140,8 +1124,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
total_marks_pk.load(std::memory_order_relaxed),
sum_marks, sum_ranges);
if (num_granules_are_to_read)
*num_granules_are_to_read = sum_marks_pk.load(std::memory_order_relaxed);
if (num_granules_to_read)
*num_granules_to_read = sum_marks_pk.load(std::memory_order_relaxed);
if (parts_with_ranges.empty())
return std::make_unique<QueryPlan>();
@ -1436,7 +1420,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
std::move(parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.aggregate_projection,
query_info.projection,
virt_columns,
step_settings,
num_streams,
@ -1646,7 +1630,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
std::move(new_parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.aggregate_projection,
query_info.projection,
virt_columns,
step_settings,
num_streams,
@ -1830,7 +1814,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
std::move(new_parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.aggregate_projection,
query_info.projection,
virt_columns,
step_settings,
num_streams,
@ -1922,7 +1906,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
std::move(lonely_parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.aggregate_projection,
query_info.projection,
virt_columns,
step_settings,
num_streams_for_lonely_parts,

View File

@ -46,7 +46,7 @@ public:
UInt64 max_block_size,
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr,
size_t * num_granules_are_to_read = nullptr,
size_t * num_granules_to_read = nullptr,
bool use_projection_metadata = false) const;
private:

View File

@ -23,7 +23,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -32,7 +32,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, aggregate_projection_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},

View File

@ -27,7 +27,7 @@ public:
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -23,7 +23,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -32,7 +32,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, aggregate_projection_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},

View File

@ -27,7 +27,7 @@ public:
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -19,12 +19,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, aggregate_projection_,
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, projection_,
max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},

View File

@ -25,7 +25,7 @@ public:
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * aggregate_projection_,
const ProjectionDescription * projection_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_);

View File

@ -26,6 +26,7 @@ bool ProjectionCondition::check(const ASTPtr & node)
if (key_columns.end() != it)
{
++it->second;
required_columns_in_predicate.insert(name);
return true;
}
else
@ -55,7 +56,7 @@ bool ProjectionCondition::check(const ASTPtr & node)
return false;
}
// TODO zhentianqi other special functions such as joinGet/dictGet
// TODO Need to check other special functions such as joinGet/dictGet
auto name = node->getColumnNameWithoutAlias();
auto it = key_columns.find(name);
if (key_columns.end() != it)

View File

@ -16,10 +16,13 @@ public:
Names getRequiredColumns() const;
NameSet getRequiredColumnsInPredicate() const { return required_columns_in_predicate; }
void rewrite(ASTPtr & node) const;
private:
std::unordered_map<std::string, size_t> key_columns;
NameSet required_columns_in_predicate;
};
}

View File

@ -31,9 +31,17 @@ public:
size_t max_block_size,
unsigned num_streams) override
{
QueryPlan query_plan =
std::move(*MergeTreeDataSelectExecutor(parts.front()->storage)
.readFromParts(parts, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams, nullptr, &num_granules_from_last_read));
QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(parts.front()->storage)
.readFromParts(
parts,
column_names,
metadata_snapshot,
query_info,
context,
max_block_size,
num_streams,
nullptr,
&num_granules_from_last_read));
return query_plan.convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}

View File

@ -147,8 +147,8 @@ struct SelectQueryInfo
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
/// If not null, it means we choose an aggregate projection to execute current query.
const ProjectionDescription * aggregate_projection{};
/// If not null, it means we choose a projection to execute current query.
const ProjectionDescription * projection{};
ProjectionKeyActions key_actions;
Names projection_names;
Block projection_block;

View File

@ -1,6 +1,6 @@
AlterQuery t1 (children 1)
ExpressionList (children 1)
AlterCommand 27 (children 1)
AlterCommand 30 (children 1)
Function equals (children 1)
ExpressionList (children 2)
Identifier date