Planner prepare filters for analysis

This commit is contained in:
Maksim Kita 2023-07-29 19:17:14 +03:00
parent 6211845ef0
commit d82a834bec
7 changed files with 116 additions and 10 deletions

View File

@ -138,6 +138,78 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
}
}
/** Storages can rely that filters that for storage will be available for analysis before
* getQueryProcessingStage method will be called.
*
* StorageDistributed skip unused shards optimization relies on this.
*
* To collect filters that will be applied to specific table in case we have JOINs requires
* to run query plan optimization pipeline.
*
* Algorithm:
* 1. Replace all table expressions in query tree with dummy tables.
* 2. Build query plan.
* 3. Optimize query plan.
* 4. Extract filters from ReadFromDummy query plan steps from query plan leaf nodes.
*/
void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
{
bool all_table_expressions_are_dummy = true;
for (auto & [table_expression, table_expression_data] : planner_context->getTableExpressionNodeToData())
{
auto * table_node = table_expression->as<TableNode>();
if (table_node && typeid_cast<const StorageDummy *>(table_node->getStorage().get()))
continue;
all_table_expressions_are_dummy = false;
break;
}
if (all_table_expressions_are_dummy)
return;
ResultReplacementMap replacement_map;
auto updated_query_tree = replaceTableExpressionsWithDummyTables(query_tree, planner_context->getQueryContext(), &replacement_map);
std::unordered_map<const IStorage *, TableExpressionData *> dummy_storage_to_table_expression_data;
for (auto & [from_table_expression, dummy_table_expression] : replacement_map)
{
auto * dummy_storage = dummy_table_expression->as<TableNode &>().getStorage().get();
auto * table_expression_data = &planner_context->getTableExpressionDataOrThrow(from_table_expression);
dummy_storage_to_table_expression_data.emplace(dummy_storage, table_expression_data);
}
const auto & query_context = planner_context->getQueryContext();
Planner planner(updated_query_tree, {});
planner.buildQueryPlanIfNeeded();
auto & result_query_plan = planner.getQueryPlan();
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context);
result_query_plan.optimize(optimization_settings);
std::vector<QueryPlan::Node *> nodes_to_process;
nodes_to_process.push_back(result_query_plan.getRootNode());
while (!nodes_to_process.empty())
{
const auto * node_to_process = nodes_to_process.back();
nodes_to_process.pop_back();
nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end());
auto * read_from_dummy = typeid_cast<ReadFromDummy *>(node_to_process->step.get());
if (!read_from_dummy)
continue;
auto filter_actions = ActionsDAG::buildFilterActionsDAG(read_from_dummy->getFilterNodes().nodes, {}, query_context);
auto & table_expression_data = dummy_storage_to_table_expression_data.at(&read_from_dummy->getStorage());
table_expression_data->setFilterActions(std::move(filter_actions));
}
}
/// Extend lifetime of query context, storages, and table locks
void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
{
@ -1226,6 +1298,9 @@ void Planner::buildPlanForQueryNode()
collectSets(query_tree, *planner_context);
collectTableExpressionData(query_tree, planner_context);
if (!select_query_options.only_analyze)
collectFiltersForAnalysis(query_tree, planner_context);
const auto & settings = query_context->getSettingsRef();
/// Check support for JOIN for parallel replicas with custom key

View File

@ -524,6 +524,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
auto table_expression_query_info = select_query_info;
table_expression_query_info.table_expression = table_expression;
table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions();
size_t max_streams = settings.max_threads;
size_t max_threads_execute_query = settings.max_threads;

View File

@ -355,24 +355,38 @@ QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, con
return function_node;
}
QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node,
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
const ContextPtr & context,
ResultReplacementMap * result_replacement_map)
{
auto & query_node_typed = query_node->as<QueryNode &>();
auto table_expressions = extractTableExpressions(query_node_typed.getJoinTree());
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
size_t subquery_index = 0;
for (auto & table_expression : table_expressions)
{
auto * table_node = table_expression->as<TableNode>();
auto * table_function_node = table_expression->as<TableFunctionNode>();
if (!table_node && !table_function_node)
continue;
auto * subquery_node = table_expression->as<QueryNode>();
auto * union_node = table_expression->as<UnionNode>();
StoragePtr storage_dummy;
if (table_node || table_function_node)
{
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
storage_dummy
= std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(), storage_snapshot->metadata->getColumns());
}
else if (subquery_node || union_node)
{
const auto & projection_columns
= subquery_node ? subquery_node->getProjectionColumns() : union_node->computeProjectionColumns();
storage_dummy = std::make_shared<StorageDummy>(StorageID{"dummy", "subquery_" + std::to_string(subquery_index)}, ColumnsDescription(projection_columns));
++subquery_index;
}
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
auto storage_dummy = std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(),
storage_snapshot->metadata->getColumns());
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
if (result_replacement_map)

View File

@ -65,9 +65,9 @@ bool queryHasWithTotalsInAnySubqueryInJoinTree(const QueryTreeNodePtr & query_no
/// Returns `and` function node that has condition nodes as its arguments
QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context);
/// Replace tables nodes and table function nodes with dummy table nodes
/// Replace table expressions from query JOIN TREE with dummy tables
using ResultReplacementMap = std::unordered_map<QueryTreeNodePtr, QueryTreeNodePtr>;
QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node,
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
const ContextPtr & context,
ResultReplacementMap * result_replacement_map = nullptr);

View File

@ -140,6 +140,17 @@ void ColumnDescription::readText(ReadBuffer & buf)
}
}
ColumnsDescription::ColumnsDescription(std::initializer_list<NameAndTypePair> ordinary)
{
for (const auto & elem : ordinary)
add(ColumnDescription(elem.name, elem.type));
}
ColumnsDescription::ColumnsDescription(NamesAndTypes ordinary)
{
for (auto & elem : ordinary)
add(ColumnDescription(std::move(elem.name), std::move(elem.type)));
}
ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary)
{

View File

@ -102,6 +102,11 @@ class ColumnsDescription : public IHints<1, ColumnsDescription>
{
public:
ColumnsDescription() = default;
ColumnsDescription(std::initializer_list<NameAndTypePair> ordinary);
explicit ColumnsDescription(NamesAndTypes ordinary);
explicit ColumnsDescription(NamesAndTypesList ordinary);
explicit ColumnsDescription(NamesAndTypesList ordinary, NamesAndAliases aliases);

View File

@ -8,7 +8,7 @@
namespace DB
{
class StorageDummy : public IStorage
class StorageDummy final : public IStorage
{
public:
StorageDummy(const StorageID & table_id_, const ColumnsDescription & columns_, ColumnsDescription object_columns_ = {});
@ -46,7 +46,7 @@ private:
const ColumnsDescription object_columns;
};
class ReadFromDummy : public SourceStepWithFilter
class ReadFromDummy final : public SourceStepWithFilter
{
public:
explicit ReadFromDummy(const StorageDummy & storage_,