mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #53782 from ClickHouse/revert-52762-planner-prepare-filters-for-analysis
Revert "Planner prepare filters for analysis"
This commit is contained in:
commit
e67c002cb0
@ -42,7 +42,6 @@
|
|||||||
#include <Storages/ColumnsDescription.h>
|
#include <Storages/ColumnsDescription.h>
|
||||||
#include <Storages/SelectQueryInfo.h>
|
#include <Storages/SelectQueryInfo.h>
|
||||||
#include <Storages/StorageDummy.h>
|
#include <Storages/StorageDummy.h>
|
||||||
#include <Storages/StorageDistributed.h>
|
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
|
|
||||||
#include <Analyzer/Utils.h>
|
#include <Analyzer/Utils.h>
|
||||||
@ -139,84 +138,6 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Storages can rely that filters that for storage will be available for analysis before
|
|
||||||
* getQueryProcessingStage method will be called.
|
|
||||||
*
|
|
||||||
* StorageDistributed skip unused shards optimization relies on this.
|
|
||||||
*
|
|
||||||
* To collect filters that will be applied to specific table in case we have JOINs requires
|
|
||||||
* to run query plan optimization pipeline.
|
|
||||||
*
|
|
||||||
* Algorithm:
|
|
||||||
* 1. Replace all table expressions in query tree with dummy tables.
|
|
||||||
* 2. Build query plan.
|
|
||||||
* 3. Optimize query plan.
|
|
||||||
* 4. Extract filters from ReadFromDummy query plan steps from query plan leaf nodes.
|
|
||||||
*/
|
|
||||||
void collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
|
|
||||||
{
|
|
||||||
bool collect_filters = false;
|
|
||||||
|
|
||||||
for (auto & [table_expression, table_expression_data] : planner_context->getTableExpressionNodeToData())
|
|
||||||
{
|
|
||||||
auto * table_node = table_expression->as<TableNode>();
|
|
||||||
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
|
||||||
if (!table_node && !table_function_node)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
|
|
||||||
if (typeid_cast<const StorageDistributed *>(storage.get()))
|
|
||||||
{
|
|
||||||
collect_filters = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!collect_filters)
|
|
||||||
return;
|
|
||||||
|
|
||||||
ResultReplacementMap replacement_map;
|
|
||||||
auto updated_query_tree = replaceTableExpressionsWithDummyTables(query_tree, planner_context->getQueryContext(), &replacement_map);
|
|
||||||
|
|
||||||
std::unordered_map<const IStorage *, TableExpressionData *> dummy_storage_to_table_expression_data;
|
|
||||||
|
|
||||||
for (auto & [from_table_expression, dummy_table_expression] : replacement_map)
|
|
||||||
{
|
|
||||||
auto * dummy_storage = dummy_table_expression->as<TableNode &>().getStorage().get();
|
|
||||||
auto * table_expression_data = &planner_context->getTableExpressionDataOrThrow(from_table_expression);
|
|
||||||
dummy_storage_to_table_expression_data.emplace(dummy_storage, table_expression_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto & query_context = planner_context->getQueryContext();
|
|
||||||
|
|
||||||
SelectQueryOptions select_query_options;
|
|
||||||
Planner planner(updated_query_tree, select_query_options);
|
|
||||||
planner.buildQueryPlanIfNeeded();
|
|
||||||
|
|
||||||
auto & result_query_plan = planner.getQueryPlan();
|
|
||||||
|
|
||||||
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context);
|
|
||||||
result_query_plan.optimize(optimization_settings);
|
|
||||||
|
|
||||||
std::vector<QueryPlan::Node *> nodes_to_process;
|
|
||||||
nodes_to_process.push_back(result_query_plan.getRootNode());
|
|
||||||
|
|
||||||
while (!nodes_to_process.empty())
|
|
||||||
{
|
|
||||||
const auto * node_to_process = nodes_to_process.back();
|
|
||||||
nodes_to_process.pop_back();
|
|
||||||
nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end());
|
|
||||||
|
|
||||||
auto * read_from_dummy = typeid_cast<ReadFromDummy *>(node_to_process->step.get());
|
|
||||||
if (!read_from_dummy)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
auto filter_actions = ActionsDAG::buildFilterActionsDAG(read_from_dummy->getFilterNodes().nodes, {}, query_context);
|
|
||||||
auto & table_expression_data = dummy_storage_to_table_expression_data.at(&read_from_dummy->getStorage());
|
|
||||||
table_expression_data->setFilterActions(std::move(filter_actions));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Extend lifetime of query context, storages, and table locks
|
/// Extend lifetime of query context, storages, and table locks
|
||||||
void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
|
void extendQueryContextAndStoragesLifetime(QueryPlan & query_plan, const PlannerContextPtr & planner_context)
|
||||||
{
|
{
|
||||||
@ -1305,9 +1226,6 @@ void Planner::buildPlanForQueryNode()
|
|||||||
collectSets(query_tree, *planner_context);
|
collectSets(query_tree, *planner_context);
|
||||||
collectTableExpressionData(query_tree, planner_context);
|
collectTableExpressionData(query_tree, planner_context);
|
||||||
|
|
||||||
if (!select_query_options.only_analyze)
|
|
||||||
collectFiltersForAnalysis(query_tree, planner_context);
|
|
||||||
|
|
||||||
const auto & settings = query_context->getSettingsRef();
|
const auto & settings = query_context->getSettingsRef();
|
||||||
|
|
||||||
/// Check support for JOIN for parallel replicas with custom key
|
/// Check support for JOIN for parallel replicas with custom key
|
||||||
|
@ -544,7 +544,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
|
|||||||
|
|
||||||
auto table_expression_query_info = select_query_info;
|
auto table_expression_query_info = select_query_info;
|
||||||
table_expression_query_info.table_expression = table_expression;
|
table_expression_query_info.table_expression = table_expression;
|
||||||
table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions();
|
|
||||||
|
|
||||||
size_t max_streams = settings.max_threads;
|
size_t max_streams = settings.max_threads;
|
||||||
size_t max_threads_execute_query = settings.max_threads;
|
size_t max_threads_execute_query = settings.max_threads;
|
||||||
|
@ -355,52 +355,24 @@ QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, con
|
|||||||
return function_node;
|
return function_node;
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
|
QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node,
|
||||||
const ContextPtr & context,
|
const ContextPtr & context,
|
||||||
ResultReplacementMap * result_replacement_map)
|
ResultReplacementMap * result_replacement_map)
|
||||||
{
|
{
|
||||||
auto & query_node_typed = query_node->as<QueryNode &>();
|
auto & query_node_typed = query_node->as<QueryNode &>();
|
||||||
auto table_expressions = extractTableExpressions(query_node_typed.getJoinTree());
|
auto table_expressions = extractTableExpressions(query_node_typed.getJoinTree());
|
||||||
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
|
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
|
||||||
size_t subquery_index = 0;
|
|
||||||
|
|
||||||
for (auto & table_expression : table_expressions)
|
for (auto & table_expression : table_expressions)
|
||||||
{
|
{
|
||||||
auto * table_node = table_expression->as<TableNode>();
|
auto * table_node = table_expression->as<TableNode>();
|
||||||
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
||||||
auto * subquery_node = table_expression->as<QueryNode>();
|
if (!table_node && !table_function_node)
|
||||||
auto * union_node = table_expression->as<UnionNode>();
|
continue;
|
||||||
|
|
||||||
StoragePtr storage_dummy;
|
|
||||||
|
|
||||||
if (table_node || table_function_node)
|
|
||||||
{
|
|
||||||
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
|
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
|
||||||
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
|
auto storage_dummy = std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(),
|
||||||
|
storage_snapshot->metadata->getColumns());
|
||||||
storage_dummy
|
|
||||||
= std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(), ColumnsDescription(storage_snapshot->getColumns(get_column_options)));
|
|
||||||
}
|
|
||||||
else if (subquery_node || union_node)
|
|
||||||
{
|
|
||||||
const auto & subquery_projection_columns
|
|
||||||
= subquery_node ? subquery_node->getProjectionColumns() : union_node->computeProjectionColumns();
|
|
||||||
|
|
||||||
NameSet unique_column_names;
|
|
||||||
NamesAndTypes storage_dummy_columns;
|
|
||||||
storage_dummy_columns.reserve(subquery_projection_columns.size());
|
|
||||||
|
|
||||||
for (const auto & projection_column : subquery_projection_columns)
|
|
||||||
{
|
|
||||||
auto [_, inserted] = unique_column_names.insert(projection_column.name);
|
|
||||||
if (inserted)
|
|
||||||
storage_dummy_columns.emplace_back(projection_column);
|
|
||||||
}
|
|
||||||
|
|
||||||
storage_dummy = std::make_shared<StorageDummy>(StorageID{"dummy", "subquery_" + std::to_string(subquery_index)}, ColumnsDescription(storage_dummy_columns));
|
|
||||||
++subquery_index;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
|
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
|
||||||
|
|
||||||
if (result_replacement_map)
|
if (result_replacement_map)
|
||||||
|
@ -65,9 +65,9 @@ bool queryHasWithTotalsInAnySubqueryInJoinTree(const QueryTreeNodePtr & query_no
|
|||||||
/// Returns `and` function node that has condition nodes as its arguments
|
/// Returns `and` function node that has condition nodes as its arguments
|
||||||
QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context);
|
QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context);
|
||||||
|
|
||||||
/// Replace table expressions from query JOIN TREE with dummy tables
|
/// Replace tables nodes and table function nodes with dummy table nodes
|
||||||
using ResultReplacementMap = std::unordered_map<QueryTreeNodePtr, QueryTreeNodePtr>;
|
using ResultReplacementMap = std::unordered_map<QueryTreeNodePtr, QueryTreeNodePtr>;
|
||||||
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
|
QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNodePtr & query_node,
|
||||||
const ContextPtr & context,
|
const ContextPtr & context,
|
||||||
ResultReplacementMap * result_replacement_map = nullptr);
|
ResultReplacementMap * result_replacement_map = nullptr);
|
||||||
|
|
||||||
|
@ -140,17 +140,6 @@ void ColumnDescription::readText(ReadBuffer & buf)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnsDescription::ColumnsDescription(std::initializer_list<NameAndTypePair> ordinary)
|
|
||||||
{
|
|
||||||
for (const auto & elem : ordinary)
|
|
||||||
add(ColumnDescription(elem.name, elem.type));
|
|
||||||
}
|
|
||||||
|
|
||||||
ColumnsDescription::ColumnsDescription(NamesAndTypes ordinary)
|
|
||||||
{
|
|
||||||
for (auto & elem : ordinary)
|
|
||||||
add(ColumnDescription(std::move(elem.name), std::move(elem.type)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary)
|
ColumnsDescription::ColumnsDescription(NamesAndTypesList ordinary)
|
||||||
{
|
{
|
||||||
|
@ -102,11 +102,6 @@ class ColumnsDescription : public IHints<1, ColumnsDescription>
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ColumnsDescription() = default;
|
ColumnsDescription() = default;
|
||||||
|
|
||||||
ColumnsDescription(std::initializer_list<NameAndTypePair> ordinary);
|
|
||||||
|
|
||||||
explicit ColumnsDescription(NamesAndTypes ordinary);
|
|
||||||
|
|
||||||
explicit ColumnsDescription(NamesAndTypesList ordinary);
|
explicit ColumnsDescription(NamesAndTypesList ordinary);
|
||||||
|
|
||||||
explicit ColumnsDescription(NamesAndTypesList ordinary, NamesAndAliases aliases);
|
explicit ColumnsDescription(NamesAndTypesList ordinary, NamesAndAliases aliases);
|
||||||
|
@ -1304,10 +1304,6 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
|
|||||||
|
|
||||||
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
|
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
|
||||||
|
|
||||||
/// Do not keep data parts in snapshot.
|
|
||||||
/// They are stored separately, and some could be released after PK analysis.
|
|
||||||
auto storage_snapshot_copy = storage_snapshot->clone(std::make_unique<MergeTreeData::SnapshotData>());
|
|
||||||
|
|
||||||
return std::make_unique<ReadFromMergeTree>(
|
return std::make_unique<ReadFromMergeTree>(
|
||||||
std::move(parts),
|
std::move(parts),
|
||||||
std::move(alter_conversions),
|
std::move(alter_conversions),
|
||||||
@ -1315,7 +1311,7 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
|
|||||||
virt_column_names,
|
virt_column_names,
|
||||||
data,
|
data,
|
||||||
query_info,
|
query_info,
|
||||||
storage_snapshot_copy,
|
storage_snapshot,
|
||||||
context,
|
context,
|
||||||
max_block_size,
|
max_block_size,
|
||||||
num_streams,
|
num_streams,
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
class StorageDummy final : public IStorage
|
class StorageDummy : public IStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
StorageDummy(const StorageID & table_id_, const ColumnsDescription & columns_, ColumnsDescription object_columns_ = {});
|
StorageDummy(const StorageID & table_id_, const ColumnsDescription & columns_, ColumnsDescription object_columns_ = {});
|
||||||
@ -46,7 +46,7 @@ private:
|
|||||||
const ColumnsDescription object_columns;
|
const ColumnsDescription object_columns;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ReadFromDummy final : public SourceStepWithFilter
|
class ReadFromDummy : public SourceStepWithFilter
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit ReadFromDummy(const StorageDummy & storage_,
|
explicit ReadFromDummy(const StorageDummy & storage_,
|
||||||
|
@ -255,6 +255,13 @@ void StorageMergeTree::read(
|
|||||||
processed_stage, nullptr, enable_parallel_reading))
|
processed_stage, nullptr, enable_parallel_reading))
|
||||||
query_plan = std::move(*plan);
|
query_plan = std::move(*plan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Now, copy of parts that is required for the query, stored in the processors,
|
||||||
|
/// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning,
|
||||||
|
/// reset them to avoid holding them.
|
||||||
|
auto & snapshot_data = assert_cast<MergeTreeData::SnapshotData &>(*storage_snapshot->data);
|
||||||
|
snapshot_data.parts = {};
|
||||||
|
snapshot_data.alter_conversions = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
|
std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
|
||||||
|
@ -5103,6 +5103,15 @@ void StorageReplicatedMergeTree::read(
|
|||||||
const size_t max_block_size,
|
const size_t max_block_size,
|
||||||
const size_t num_streams)
|
const size_t num_streams)
|
||||||
{
|
{
|
||||||
|
SCOPE_EXIT({
|
||||||
|
/// Now, copy of parts that is required for the query, stored in the processors,
|
||||||
|
/// while snapshot_data.parts includes all parts, even one that had been filtered out with partition pruning,
|
||||||
|
/// reset them to avoid holding them.
|
||||||
|
auto & snapshot_data = assert_cast<MergeTreeData::SnapshotData &>(*storage_snapshot->data);
|
||||||
|
snapshot_data.parts = {};
|
||||||
|
snapshot_data.alter_conversions = {};
|
||||||
|
});
|
||||||
|
|
||||||
const auto & settings = local_context->getSettingsRef();
|
const auto & settings = local_context->getSettingsRef();
|
||||||
|
|
||||||
/// The `select_sequential_consistency` setting has two meanings:
|
/// The `select_sequential_consistency` setting has two meanings:
|
||||||
|
@ -17,16 +17,6 @@ namespace ErrorCodes
|
|||||||
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
|
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<StorageSnapshot> StorageSnapshot::clone(DataPtr data_) const
|
|
||||||
{
|
|
||||||
auto res = std::make_shared<StorageSnapshot>(storage, metadata, object_columns);
|
|
||||||
|
|
||||||
res->projection = projection;
|
|
||||||
res->data = std::move(data_);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
void StorageSnapshot::init()
|
void StorageSnapshot::init()
|
||||||
{
|
{
|
||||||
for (const auto & [name, type] : storage.getVirtuals())
|
for (const auto & [name, type] : storage.getVirtuals())
|
||||||
|
@ -60,8 +60,6 @@ struct StorageSnapshot
|
|||||||
init();
|
init();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<StorageSnapshot> clone(DataPtr data_) const;
|
|
||||||
|
|
||||||
/// Get all available columns with types according to options.
|
/// Get all available columns with types according to options.
|
||||||
NamesAndTypesList getColumns(const GetColumnsOptions & options) const;
|
NamesAndTypesList getColumns(const GetColumnsOptions & options) const;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user