Implement replica estimation on top of the analyzer

This commit is contained in:
Raúl Marín 2023-10-16 18:24:05 +02:00
parent f799f5d7a1
commit bfb5b8f441
2 changed files with 82 additions and 65 deletions

View File

@ -40,10 +40,11 @@
#include <Interpreters/StorageID.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageDistributed.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDummy.h>
#include <Analyzer/Utils.h>
#include <Analyzer/ColumnNode.h>
@ -143,6 +144,7 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
* getQueryProcessingStage method will be called.
*
* StorageDistributed skip unused shards optimization relies on this.
* Parallel replicas estimation relies on this too.
*
* To collect filters that will be applied to specific table in case we have JOINs requires
* to run query plan optimization pipeline.
@ -156,6 +158,11 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
{
bool collect_filters = false;
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
bool parallel_replicas_estimation_enabled
= query_context->canUseParallelReplicasOnInitiator() && settings.parallel_replicas_min_number_of_rows_per_replica > 0;
for (auto & [table_expression, table_expression_data] : planner_context->getTableExpressionNodeToData())
{
@ -165,7 +172,8 @@ void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const Planne
continue;
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
if (typeid_cast<const StorageDistributed *>(storage.get()))
if (typeid_cast<const StorageDistributed *>(storage.get())
|| (parallel_replicas_estimation_enabled && std::dynamic_pointer_cast<MergeTreeData>(storage)))
{
collect_filters = true;
break;
@ -187,8 +195,6 @@ void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const Planne
dummy_storage_to_table_expression_data.emplace(dummy_storage, table_expression_data);
}
const auto & query_context = planner_context->getQueryContext();
SelectQueryOptions select_query_options;
Planner planner(updated_query_tree, select_query_options);
planner.buildQueryPlanIfNeeded();

View File

@ -526,6 +526,41 @@ FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context);
}
static UInt64 mainQueryNodeBlockSizeByLimit(const SelectQueryInfo & select_query_info)
{
auto const & main_query_node = select_query_info.query_tree->as<QueryNode const &>();
/// Constness of limit and offset is validated during query analysis stage
size_t limit_length = 0;
if (main_query_node.hasLimit())
limit_length = main_query_node.getLimit()->as<ConstantNode &>().getValue().safeGet<UInt64>();
size_t limit_offset = 0;
if (main_query_node.hasOffset())
limit_offset = main_query_node.getOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
/** If not specified DISTINCT, WHERE, GROUP BY, HAVING, ORDER BY, JOIN, LIMIT BY, LIMIT WITH TIES
* but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
*/
if (main_query_node.hasLimit()
&& !main_query_node.isDistinct()
&& !main_query_node.isLimitWithTies()
&& !main_query_node.hasPrewhere()
&& !main_query_node.hasWhere()
&& select_query_info.filter_asts.empty()
&& !main_query_node.hasGroupBy()
&& !main_query_node.hasHaving()
&& !main_query_node.hasOrderBy()
&& !main_query_node.hasLimitBy()
&& !select_query_info.need_aggregate
&& !select_query_info.has_window
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
return limit_length + limit_offset;
return 0;
}
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
const SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
@ -580,54 +615,27 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
UInt64 max_block_size = settings.max_block_size;
auto & main_query_node = select_query_info.query_tree->as<QueryNode &>();
UInt64 max_block_size_limited = 0;
if (is_single_table_expression)
{
size_t limit_length = 0;
if (main_query_node.hasLimit())
{
/// Constness of limit is validated during query analysis stage
limit_length = main_query_node.getLimit()->as<ConstantNode &>().getValue().safeGet<UInt64>();
}
size_t limit_offset = 0;
if (main_query_node.hasOffset())
{
/// Constness of offset is validated during query analysis stage
limit_offset = main_query_node.getOffset()->as<ConstantNode &>().getValue().safeGet<UInt64>();
}
/** If not specified DISTINCT, WHERE, GROUP BY, HAVING, ORDER BY, JOIN, LIMIT BY, LIMIT WITH TIES
* but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
*/
if (main_query_node.hasLimit() &&
!main_query_node.isDistinct() &&
!main_query_node.isLimitWithTies() &&
!main_query_node.hasPrewhere() &&
!main_query_node.hasWhere() &&
select_query_info.filter_asts.empty() &&
!main_query_node.hasGroupBy() &&
!main_query_node.hasHaving() &&
!main_query_node.hasOrderBy() &&
!main_query_node.hasLimitBy() &&
!select_query_info.need_aggregate &&
!select_query_info.has_window &&
limit_length <= std::numeric_limits<UInt64>::max() - limit_offset)
max_block_size_limited = mainQueryNodeBlockSizeByLimit(select_query_info);
if (max_block_size_limited)
{
if (limit_length + limit_offset < max_block_size)
if (max_block_size_limited < max_block_size)
{
max_block_size = std::max<UInt64>(1, limit_length + limit_offset);
max_block_size = std::max<UInt64>(1, max_block_size_limited);
max_streams = 1;
max_threads_execute_query = 1;
}
if (limit_length + limit_offset < select_query_info.local_storage_limits.local_limits.size_limits.max_rows)
if (max_block_size_limited < select_query_info.local_storage_limits.local_limits.size_limits.max_rows)
{
table_expression_query_info.limit = limit_length + limit_offset;
table_expression_query_info.limit = max_block_size_limited;
}
}
@ -689,32 +697,35 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
if (storage_merge_tree && query_context->canUseParallelReplicasOnInitiator()
&& settings.parallel_replicas_min_number_of_rows_per_replica > 0)
{
/// This is trash
/// It uses the old InterpreterSelectQuery to do the estimation of how many rows will be read
/// Ideally we should be able to use estimateNumberOfRowsToRead over the storage, but to do this
/// properly we need all the actions/filters, which aren't available yet
/// If we instead delay this check for later several things will happen:
/// * The header might be different (updatePrewhereOutputsIfNeeded)
/// * The storage will have been initiated (thus already preparing parallel replicas)
auto query_options = SelectQueryOptions(
QueryProcessingStage::WithMergeableState,
/* depth */ 1,
/* is_subquery_= */ true)
.ignoreProjections()
.ignoreAlias();
InterpreterSelectQuery select(
table_expression_query_info.original_query,
query_context,
query_options,
table_expression_query_info.prepared_sets);
select.adjustParallelReplicasAfterAnalysis();
planner_context->getMutableQueryContext()->setSetting(
"allow_experimental_parallel_reading_from_replicas",
select.getContext()->getSettingsRef().allow_experimental_parallel_reading_from_replicas.operator Field());
planner_context->getMutableQueryContext()->setSetting(
"max_parallel_replicas", select.getContext()->getSettingsRef().max_parallel_replicas.operator Field());
}
ActionDAGNodes filter_nodes;
if (table_expression_query_info.filter_actions_dag)
filter_nodes.nodes = table_expression_query_info.filter_actions_dag->getOutputs();
UInt64 rows_to_read = storage_merge_tree->estimateNumberOfRowsToRead(
query_context, storage_snapshot, table_expression_query_info, filter_nodes);
if (max_block_size_limited && max_block_size_limited < rows_to_read)
rows_to_read = max_block_size_limited;
size_t number_of_replicas_to_use = rows_to_read / settings.parallel_replicas_min_number_of_rows_per_replica;
LOG_TRACE(
&Poco::Logger::get("Planner"),
"Estimated {} rows to read. It is enough work for {} parallel replicas",
rows_to_read,
number_of_replicas_to_use);
if (number_of_replicas_to_use <= 1)
{
planner_context->getMutableQueryContext()->setSetting(
"allow_experimental_parallel_reading_from_replicas", Field(0));
planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", UInt64{0});
LOG_DEBUG(&Poco::Logger::get("Planner"), "Disabling parallel replicas because there aren't enough rows to read");
}
else if (number_of_replicas_to_use < settings.max_parallel_replicas)
{
planner_context->getMutableQueryContext()->setSetting("max_parallel_replicas", number_of_replicas_to_use);
LOG_DEBUG(&Poco::Logger::get("Planner"), "Reducing the number of replicas to use to {}", number_of_replicas_to_use);
}
}
const auto & prewhere_actions = table_expression_data.getPrewhereFilterActions();