Fix tests

This commit is contained in:
Amos Bird 2021-05-04 18:52:37 +08:00
parent b736515c66
commit ba17acbd63
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
10 changed files with 189 additions and 126 deletions

View File

@ -443,7 +443,7 @@ NameSet ActionsDAG::foldActionsByProjection(
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name)
{
std::unordered_set<const Node *> visited_nodes;
std::unordered_set<std::string_view> visited_nodes_names;
std::unordered_set<std::string_view> visited_index_names;
std::stack<Node *> stack;
std::vector<const ColumnWithTypeAndName *> missing_input_from_projection_keys;
@ -452,20 +452,21 @@ NameSet ActionsDAG::foldActionsByProjection(
if (required_columns.find(node->result_name) != required_columns.end() || node->result_name == predicate_column_name)
{
visited_nodes.insert(node);
visited_nodes_names.insert(node->result_name);
visited_index_names.insert(node->result_name);
stack.push(const_cast<Node *>(node));
}
}
for (const auto & column : required_columns)
{
if (visited_nodes_names.find(column) == visited_nodes_names.end())
if (visited_index_names.find(column) == visited_index_names.end())
{
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column))
{
const auto * node = &addInput(*column_with_type_name);
index.push_back(node);
visited_nodes.insert(node);
visited_index_names.insert(column);
}
else
{
@ -505,7 +506,7 @@ NameSet ActionsDAG::foldActionsByProjection(
nodes.remove_if([&](const Node & node) { return visited_nodes.count(&node) == 0; });
std::erase_if(inputs, [&](const Node * node) { return visited_nodes.count(node) == 0; });
std::erase_if(index, [&](const Node * node) { return visited_nodes.count(node) == 0; });
std::erase_if(index, [&](const Node * node) { return visited_index_names.count(node->result_name) == 0; });
NameSet next_required_columns;
for (const auto & input : inputs)

View File

@ -911,6 +911,26 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
bool to_aggregation_stage = false;
bool from_aggregation_stage = false;
/// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
bool aggregate_overflow_row =
expressions.need_aggregate &&
query.group_by_with_totals &&
settings.max_rows_to_group_by &&
settings.group_by_overflow_mode == OverflowMode::ANY &&
settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
/// Do I need to immediately finalize the aggregate functions after the aggregation?
bool aggregate_final =
expressions.need_aggregate &&
options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
query_info.projection->aggregate_overflow_row = aggregate_overflow_row;
query_info.projection->aggregate_final = aggregate_final;
}
if (expressions.filter_info)
{
if (!expressions.prewhere_info)
@ -1026,20 +1046,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (options.to_stage > QueryProcessingStage::FetchColumns)
{
/// Do I need to aggregate in a separate row rows that have not passed max_rows_to_group_by.
bool aggregate_overflow_row =
expressions.need_aggregate &&
query.group_by_with_totals &&
settings.max_rows_to_group_by &&
settings.group_by_overflow_mode == OverflowMode::ANY &&
settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
/// Do I need to immediately finalize the aggregate functions after the aggregation?
bool aggregate_final =
expressions.need_aggregate &&
options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
auto preliminary_sort = [&]()
{
/** For distributed query processing,
@ -1437,6 +1443,50 @@ static StreamLocalLimits getLimitsForStorage(const Settings & settings, const Se
return limits;
}
static void executeMergeAggregatedImpl(
QueryPlan & query_plan,
bool overflow_row,
bool final,
bool is_remote_storage,
const Settings & settings,
const NamesAndTypesList & aggregation_keys,
const AggregateDescriptions & aggregates)
{
const auto & header_before_merge = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : aggregation_keys)
keys.push_back(header_before_merge.getPositionByName(key.name));
/** There are two modes of distributed aggregation.
*
* 1. In different threads read from the remote servers blocks.
* Save all the blocks in the RAM. Merge blocks.
* If the aggregation is two-level - parallelize to the number of buckets.
*
* 2. In one thread, read blocks from different servers in order.
* RAM stores only one block from each server.
* If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level.
*
* The second option consumes less memory (up to 256 times less)
* in the case of two-level aggregation, which is used for large results after GROUP BY,
* but it can work more slowly.
*/
Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
std::move(transform_params),
settings.distributed_aggregation_memory_efficient && is_remote_storage,
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads);
query_plan.addStep(std::move(merging_aggregated));
}
void InterpreterSelectQuery::addEmptySourceToQueryPlan(
QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info, ContextPtr context_)
{
@ -1503,18 +1553,33 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
if (query_info.projection->before_aggregation)
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_aggregation, ExpressionActionsSettings::fromContext(context_));
pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
if (query_info.projection->before_aggregation)
{
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_aggregation, ExpressionActionsSettings::fromContext(context_));
pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
}
}
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource");
query_plan.addStep(std::move(read_from_pipe));
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
executeMergeAggregatedImpl(
query_plan,
query_info.projection->aggregate_overflow_row,
query_info.projection->aggregate_final,
false,
context_->getSettingsRef(),
query_info.projection->aggregation_keys,
query_info.projection->aggregate_descriptions);
}
}
void InterpreterSelectQuery::addPrewhereAliasActions()
@ -2049,7 +2114,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
query_plan.addStep(std::move(aggregating_step));
}
void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final)
{
/// If aggregate projection was chosen for table, avoid adding MergeAggregated.
@ -2059,41 +2123,14 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return;
const auto & header_before_merge = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
keys.push_back(header_before_merge.getPositionByName(key.name));
/** There are two modes of distributed aggregation.
*
* 1. In different threads read from the remote servers blocks.
* Save all the blocks in the RAM. Merge blocks.
* If the aggregation is two-level - parallelize to the number of buckets.
*
* 2. In one thread, read blocks from different servers in order.
* RAM stores only one block from each server.
* If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level.
*
* The second option consumes less memory (up to 256 times less)
* in the case of two-level aggregation, which is used for large results after GROUP BY,
* but it can work more slowly.
*/
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header_before_merge, keys, query_analyzer->aggregates(), overflow_row, settings.max_threads);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
std::move(transform_params),
settings.distributed_aggregation_memory_efficient && storage && storage->isRemote(),
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads);
query_plan.addStep(std::move(merging_aggregated));
executeMergeAggregatedImpl(
query_plan,
overflow_row,
final,
storage && storage->isRemote(),
context->getSettingsRef(),
query_analyzer->aggregationKeys(),
query_analyzer->aggregates());
}

View File

@ -3923,11 +3923,14 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
{
if (analysis_result.before_where)
{
candidate.remove_where_filter = analysis_result.remove_where_filter;
candidate.before_where = analysis_result.before_where->clone();
std::cerr << fmt::format("before_where_actions = \n{}", candidate.before_where->dumpDAG()) << std::endl;
required_columns = candidate.before_where->foldActionsByProjection(
required_columns,
projection.sample_block_for_keys,
query_ptr->as<const ASTSelectQuery &>().where()->getColumnName());
std::cerr << fmt::format("before_where_actions = \n{}", candidate.before_where->dumpDAG()) << std::endl;
if (required_columns.empty())
return false;
@ -3944,9 +3947,13 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
NameSet prewhere_required_columns;
// If there is a before_where action, prewhere_action only requires columns to evaluate the prewhere expression.
// Else it should provide all columns to later actions.
NameSet prewhere_required_columns = analysis_result.before_where ? NameSet{} : required_columns;
std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl;
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name);
std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl;
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
@ -3966,8 +3973,10 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
if (prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl;
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys);
std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl;
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
@ -4020,8 +4029,12 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
// needs to provide aggregation keys, and certain children DAG might be substituted by
// some keys in projection.
candidate.before_aggregation = analysis_result.before_aggregation->clone();
std::cerr << fmt::format("keys = {}", fmt::join(keys, ", ")) << std::endl;
std::cerr << fmt::format("before_aggregation = \n{}", candidate.before_aggregation->dumpDAG()) << std::endl;
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys);
std::cerr << fmt::format("before_aggregation = \n{}", candidate.before_aggregation->dumpDAG()) << std::endl;
std::cerr << fmt::format("keys = {}", fmt::join(required_columns, ", ")) << std::endl;
if (required_columns.empty())
continue;

View File

@ -176,9 +176,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
const auto & given_select = query_info.query->as<const ASTSelectQuery &>();
if (query_info.projection->merge_tree_data_select_projection_cache->sum_marks > 0)
{
LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", "));
auto plan = readFromParts(
{},
query_info.projection->required_columns, // raw columns without key transformation
query_info.projection->required_columns,
metadata_snapshot,
query_info.projection->desc->metadata,
query_info,
@ -189,10 +190,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
query_info.projection->merge_tree_data_select_projection_cache.get());
if (plan)
projection_pipe = plan->convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
if (!projection_pipe.empty())
{
// If `before_where` is not empty, transform input blocks by adding needed columns
// originated from key columns. We already project the block at the end, using
@ -200,19 +197,27 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
// NOTE: prewhere is executed inside readFromParts
if (query_info.projection->before_where)
{
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_where, ExpressionActionsSettings::fromContext(context));
projection_pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
LOG_DEBUG(log, "projection before_where: {}", query_info.projection->before_where->dumpDAG());
auto where_step = std::make_unique<FilterStep>(
plan->getCurrentDataStream(),
query_info.projection->before_where,
given_select.where()->getColumnName(),
query_info.projection->remove_where_filter);
where_step->setStepDescription("WHERE");
plan->addStep(std::move(where_step));
}
if (query_info.projection->before_aggregation)
{
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_aggregation, ExpressionActionsSettings::fromContext(context));
projection_pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
LOG_DEBUG(log, "projection before_aggregation: {}", query_info.projection->before_aggregation->dumpDAG());
auto expression_before_aggregation
= std::make_unique<ExpressionStep>(plan->getCurrentDataStream(), query_info.projection->before_aggregation);
expression_before_aggregation->setStepDescription("Before GROUP BY");
plan->addStep(std::move(expression_before_aggregation));
}
projection_pipe = plan->convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
}
@ -251,9 +256,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;
bool overflow_row = given_select.group_by_with_totals && settings.max_rows_to_group_by
&& settings.group_by_overflow_mode == OverflowMode::ANY && settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
if (!projection_pipe.empty())
{
const auto & header_before_merge = projection_pipe.getHeader();
@ -265,14 +267,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
/// Aggregates are needed to calc proper header for AggregatingTransform result.
/// However, they are not filled cause header from projection pipe already contain aggregate functions.
AggregateDescriptions aggregates = query_info.projection->aggregate_descriptions;
const AggregateDescriptions & aggregates = query_info.projection->aggregate_descriptions;
/// Aggregator::Params params(header_before_merge, keys, query_info.aggregate_descriptions, overflow_row, settings.max_threads);
Aggregator::Params params(
header_before_merge,
keys,
aggregates,
overflow_row,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
@ -283,7 +285,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), /*final*/ true);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
/// This part is hacky.
/// We want AggregatingTransform to work with aggregate states instead of normal columns.
/// It is almost the same, just instead of adding new data to aggregation state we merge it with existing.
@ -331,7 +333,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
header_before_aggregation,
keys,
aggregates,
overflow_row,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
@ -342,7 +344,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), /*final*/ true);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
ordinary_pipe.resize(ordinary_pipe.numOutputPorts(), true, true);

View File

@ -637,17 +637,14 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
return read_result;
bool has_columns = false;
for (auto & column : columns)
{
if (column)
has_columns = true;
}
size_t total_bytes = 0;
for (auto & column : columns)
{
if (column)
{
total_bytes += column->byteSize();
has_columns = true;
}
}
read_result.addNumBytesRead(total_bytes);

View File

@ -126,10 +126,13 @@ struct ProjectionCandidate
const ProjectionDescription * desc;
PrewhereInfoPtr prewhere_info;
ActionsDAGPtr before_where;
bool remove_where_filter = false;
ActionsDAGPtr before_aggregation;
Names required_columns;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
bool aggregate_overflow_row = false;
bool aggregate_final = false;
bool complete = false;
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;

View File

@ -1,36 +1,36 @@
166665 4999980
166664 4999950
166663 4999920
166662 4999890
166661 4999860
"rows_read": 166668,
333330 9999960
333328 9999900
333326 9999840
333324 9999780
333322 9999720
"rows_read": 166668,
83331 55554 9999810
83330 55553 9999690
83328 55552 9999450
83327 55551 9999330
83325 55550 9999090
"rows_read": 166668,
166666 2499970000080
166665 2499940000350
166664 2499910000800
166663 2499880001430
166662 2499850002240
"rows_read": 166668,
166666 -333334 4999980
166665 -333332 4999950
166664 -333330 4999920
166663 -333328 4999890
166662 -333326 4999860
"rows_read": 1000000,
166665 4999980
166663 4999920
166661 4999860
166659 4999800
166657 4999740
"rows_read": 166668,
15 480
14 450
13 420
12 390
11 360
"rows_read": 18,
30 960
28 900
26 840
24 780
22 720
"rows_read": 18,
6 4 810
5 3 690
7 5 480
3 2 450
7 4 450
"rows_read": 18,
16 22080
15 19350
14 16800
13 14430
12 12240
"rows_read": 18,
16 -34 480
15 -32 450
14 -30 420
13 -28 390
12 -26 360
"rows_read": 100,
15 480
13 420
11 360
9 300
7 240
"rows_read": 18,

View File

@ -5,8 +5,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_agg_proj (x Int32, y Int32, PROJECTION x_plus_y (SELECT sum(x - y), argMax(x, y) group by x + y) TYPE aggregate) ENGINE = MergeTree ORDER BY tuple()"
$CLICKHOUSE_CLIENT -q "insert into test_agg_proj select intDiv(number, 2), -intDiv(number,3) - 1 from numbers(1000000)"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_agg_proj (x Int32, y Int32, PROJECTION x_plus_y (SELECT sum(x - y), argMax(x, y) group by x + y) TYPE aggregate) ENGINE = MergeTree ORDER BY tuple() settings index_granularity = 1"
$CLICKHOUSE_CLIENT -q "insert into test_agg_proj select intDiv(number, 2), -intDiv(number,3) - 1 from numbers(100)"
$CLICKHOUSE_CLIENT -q "select x + y, sum(x - y) as s from test_agg_proj group by x + y order by s desc limit 5 settings allow_experimental_projection_optimization=1"
$CLICKHOUSE_CLIENT -q "select x + y, sum(x - y) as s from test_agg_proj group by x + y order by s desc limit 5 settings allow_experimental_projection_optimization=1 format JSON" | grep "rows_read"

View File

@ -16,10 +16,17 @@ select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((k
drop row policy if exists filter on projection_test;
create row policy filter on projection_test using (domain = 'non_existing_domain') to all;
-- prewhere with alias with row policy
-- prewhere with alias with row policy (non existing)
select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test prewhere domain_alias = 1 where domain = '1' group by dt_m order by dt_m;
drop row policy filter on projection_test;
-- TODO There is a bug in row policy filter (not related to projections, crash in master)
-- drop row policy if exists filter on projection_test;
-- create row policy filter on projection_test using (domain != '1') to all;
-- prewhere with alias with row policy (existing)
-- select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test prewhere domain_alias = 1 where domain = '1' group by dt_m order by dt_m;
-- drop row policy filter on projection_test;
select toStartOfMinute(datetime) dt_m, count(), sum(block_count) / sum(duration), avg(block_count / duration) from projection_test group by dt_m order by dt_m;
-- TODO figure out how to deal with conflict column names

View File

@ -392,6 +392,9 @@
"01475_read_subcolumns_storages",
"01674_clickhouse_client_query_param_cte",
"01666_merge_tree_max_query_limit",
"01710_projections.sql",
"01710_normal_projections.sql",
"01710_aggregate_projections.sql",
"01786_explain_merge_tree",
"01666_merge_tree_max_query_limit",
"01802_test_postgresql_protocol_with_row_policy", /// It cannot parse DROP ROW POLICY