mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
509 lines
19 KiB
C++
509 lines
19 KiB
C++
#include <Planner/Utils.h>
|
|
|
|
#include <Parsers/ASTSelectWithUnionQuery.h>
|
|
#include <Parsers/ASTSelectQuery.h>
|
|
#include <Parsers/ASTSubquery.h>
|
|
#include <Parsers/ExpressionListParsers.h>
|
|
#include <Parsers/parseQuery.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
#include <DataTypes/DataTypeLowCardinality.h>
|
|
#include <DataTypes/DataTypeNullable.h>
|
|
|
|
#include <Columns/getLeastSuperColumn.h>
|
|
#include <Columns/ColumnSet.h>
|
|
|
|
#include <IO/WriteBufferFromString.h>
|
|
|
|
#include <Functions/FunctionFactory.h>
|
|
#include <Functions/indexHint.h>
|
|
|
|
#include <Storages/StorageDummy.h>
|
|
|
|
#include <Interpreters/Context.h>
|
|
|
|
#include <Analyzer/Utils.h>
|
|
#include <Analyzer/ConstantNode.h>
|
|
#include <Analyzer/ColumnNode.h>
|
|
#include <Analyzer/FunctionNode.h>
|
|
#include <Analyzer/QueryNode.h>
|
|
#include <Analyzer/UnionNode.h>
|
|
#include <Analyzer/TableNode.h>
|
|
#include <Analyzer/TableFunctionNode.h>
|
|
#include <Analyzer/ArrayJoinNode.h>
|
|
#include <Analyzer/JoinNode.h>
|
|
#include <Analyzer/QueryTreeBuilder.h>
|
|
#include <Analyzer/Passes/QueryAnalysisPass.h>
|
|
|
|
#include <Planner/PlannerActionsVisitor.h>
|
|
#include <Planner/CollectTableExpressionData.h>
|
|
#include <Planner/CollectSets.h>
|
|
|
|
#include <stack>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int BAD_ARGUMENTS;
|
|
extern const int LOGICAL_ERROR;
|
|
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
|
|
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
|
|
}
|
|
|
|
String dumpQueryPlan(QueryPlan & query_plan)
|
|
{
|
|
WriteBufferFromOwnString query_plan_buffer;
|
|
query_plan.explainPlan(query_plan_buffer, QueryPlan::ExplainPlanOptions{true, true, true, true});
|
|
|
|
return query_plan_buffer.str();
|
|
}
|
|
|
|
String dumpQueryPipeline(QueryPlan & query_plan)
|
|
{
|
|
QueryPlan::ExplainPipelineOptions explain_pipeline;
|
|
WriteBufferFromOwnString query_pipeline_buffer;
|
|
query_plan.explainPipeline(query_pipeline_buffer, explain_pipeline);
|
|
|
|
return query_pipeline_buffer.str();
|
|
}
|
|
|
|
Block buildCommonHeaderForUnion(const Blocks & queries_headers, SelectUnionMode union_mode)
|
|
{
|
|
size_t num_selects = queries_headers.size();
|
|
Block common_header = queries_headers.front();
|
|
size_t columns_size = common_header.columns();
|
|
|
|
for (size_t query_number = 1; query_number < num_selects; ++query_number)
|
|
{
|
|
int error_code = 0;
|
|
|
|
if (union_mode == SelectUnionMode::UNION_DEFAULT ||
|
|
union_mode == SelectUnionMode::UNION_ALL ||
|
|
union_mode == SelectUnionMode::UNION_DISTINCT)
|
|
error_code = ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH;
|
|
else
|
|
error_code = ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
|
|
|
|
if (queries_headers.at(query_number).columns() != columns_size)
|
|
throw Exception(error_code,
|
|
"Different number of columns in {} elements: {} and {}",
|
|
toString(union_mode),
|
|
common_header.dumpNames(),
|
|
queries_headers[query_number].dumpNames());
|
|
}
|
|
|
|
std::vector<const ColumnWithTypeAndName *> columns(num_selects);
|
|
|
|
for (size_t column_number = 0; column_number < columns_size; ++column_number)
|
|
{
|
|
for (size_t i = 0; i < num_selects; ++i)
|
|
columns[i] = &queries_headers[i].getByPosition(column_number);
|
|
|
|
ColumnWithTypeAndName & result_element = common_header.getByPosition(column_number);
|
|
result_element = getLeastSuperColumn(columns);
|
|
}
|
|
|
|
return common_header;
|
|
}
|
|
|
|
ASTPtr queryNodeToSelectQuery(const QueryTreeNodePtr & query_node)
|
|
{
|
|
auto & query_node_typed = query_node->as<QueryNode &>();
|
|
|
|
// In case of cross-replication we don't know what database is used for the table.
|
|
// Each shard will use the default database (in the case of cross-replication shards may have different defaults).
|
|
auto result_ast = query_node_typed.toAST({ .qualify_indentifiers_with_database = false });
|
|
|
|
while (true)
|
|
{
|
|
if (auto * select_query = result_ast->as<ASTSelectQuery>())
|
|
break;
|
|
else if (auto * select_with_union = result_ast->as<ASTSelectWithUnionQuery>())
|
|
result_ast = select_with_union->list_of_selects->children.at(0);
|
|
else if (auto * subquery = result_ast->as<ASTSubquery>())
|
|
result_ast = subquery->children.at(0);
|
|
else
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query node invalid conversion to select query");
|
|
}
|
|
|
|
if (result_ast == nullptr)
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query node invalid conversion to select query");
|
|
|
|
return result_ast;
|
|
}
|
|
|
|
static void removeCTEs(ASTPtr & ast)
|
|
{
|
|
std::stack<IAST *> stack;
|
|
stack.push(ast.get());
|
|
while (!stack.empty())
|
|
{
|
|
auto * node = stack.top();
|
|
stack.pop();
|
|
|
|
if (auto * subquery = typeid_cast<ASTSubquery *>(node))
|
|
subquery->cte_name = {};
|
|
|
|
for (const auto & child : node->children)
|
|
stack.push(child.get());
|
|
}
|
|
}
|
|
|
|
ASTPtr queryNodeToDistributedSelectQuery(const QueryTreeNodePtr & query_node)
|
|
{
|
|
auto ast = queryNodeToSelectQuery(query_node);
|
|
/// Remove CTEs information from distributed queries.
|
|
/// Now, if cte_name is set for subquery node, AST -> String serialization will only print cte name.
|
|
/// But CTE is defined only for top-level query part, so may not be sent.
|
|
/// Removing cte_name forces subquery to be always printed.
|
|
removeCTEs(ast);
|
|
return ast;
|
|
}
|
|
|
|
namespace
|
|
{
|
|
|
|
StreamLocalLimits getLimitsForStorage(const Settings & settings, const SelectQueryOptions & options)
|
|
{
|
|
StreamLocalLimits limits;
|
|
limits.mode = LimitsMode::LIMITS_TOTAL;
|
|
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
|
|
limits.speed_limits.max_execution_time = settings.max_execution_time;
|
|
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
|
|
|
|
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
|
|
* because the initiating server has a summary of the execution of the request on all servers.
|
|
*
|
|
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
|
|
* additionally on each remote server, because these limits are checked per block of data processed,
|
|
* and remote servers may process way more blocks of data than are received by initiator.
|
|
*
|
|
* The limits to throttle maximum execution speed is also checked on all servers.
|
|
*/
|
|
if (options.to_stage == QueryProcessingStage::Complete)
|
|
{
|
|
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
|
|
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
|
|
}
|
|
|
|
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
|
|
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
|
|
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
|
|
limits.speed_limits.max_estimated_execution_time = settings.max_estimated_execution_time;
|
|
|
|
return limits;
|
|
}
|
|
|
|
}
|
|
|
|
StorageLimits buildStorageLimits(const Context & context, const SelectQueryOptions & options)
|
|
{
|
|
const auto & settings = context.getSettingsRef();
|
|
|
|
StreamLocalLimits limits;
|
|
SizeLimits leaf_limits;
|
|
|
|
/// Set the limits and quota for reading data, the speed and time of the query.
|
|
if (!options.ignore_limits)
|
|
{
|
|
limits = getLimitsForStorage(settings, options);
|
|
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf, settings.read_overflow_mode_leaf);
|
|
}
|
|
|
|
return {limits, leaf_limits};
|
|
}
|
|
|
|
ActionsDAG buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
|
|
const ColumnsWithTypeAndName & input_columns,
|
|
const PlannerContextPtr & planner_context)
|
|
{
|
|
ActionsDAG action_dag(input_columns);
|
|
PlannerActionsVisitor actions_visitor(planner_context);
|
|
auto expression_dag_index_nodes = actions_visitor.visit(action_dag, expression_node);
|
|
action_dag.getOutputs() = std::move(expression_dag_index_nodes);
|
|
|
|
return action_dag;
|
|
}
|
|
|
|
bool sortDescriptionIsPrefix(const SortDescription & prefix, const SortDescription & full)
|
|
{
|
|
size_t prefix_size = prefix.size();
|
|
if (prefix_size > full.size())
|
|
return false;
|
|
|
|
for (size_t i = 0; i < prefix_size; ++i)
|
|
{
|
|
if (full[i] != prefix[i])
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool queryHasArrayJoinInJoinTree(const QueryTreeNodePtr & query_node)
|
|
{
|
|
const auto & query_node_typed = query_node->as<const QueryNode &>();
|
|
|
|
std::vector<QueryTreeNodePtr> join_tree_nodes_to_process;
|
|
join_tree_nodes_to_process.push_back(query_node_typed.getJoinTree());
|
|
|
|
while (!join_tree_nodes_to_process.empty())
|
|
{
|
|
auto join_tree_node_to_process = join_tree_nodes_to_process.back();
|
|
join_tree_nodes_to_process.pop_back();
|
|
|
|
auto join_tree_node_type = join_tree_node_to_process->getNodeType();
|
|
|
|
switch (join_tree_node_type)
|
|
{
|
|
case QueryTreeNodeType::TABLE:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::QUERY:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::UNION:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::TABLE_FUNCTION:
|
|
{
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::ARRAY_JOIN:
|
|
{
|
|
return true;
|
|
}
|
|
case QueryTreeNodeType::JOIN:
|
|
{
|
|
auto & join_node = join_tree_node_to_process->as<JoinNode &>();
|
|
join_tree_nodes_to_process.push_back(join_node.getLeftTableExpression());
|
|
join_tree_nodes_to_process.push_back(join_node.getRightTableExpression());
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
"Unexpected node type for table expression. "
|
|
"Expected table, table function, query, union, join or array join. Actual {}",
|
|
join_tree_node_to_process->getNodeTypeName());
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool queryHasWithTotalsInAnySubqueryInJoinTree(const QueryTreeNodePtr & query_node)
|
|
{
|
|
const auto & query_node_typed = query_node->as<const QueryNode &>();
|
|
|
|
std::vector<QueryTreeNodePtr> join_tree_nodes_to_process;
|
|
join_tree_nodes_to_process.push_back(query_node_typed.getJoinTree());
|
|
|
|
while (!join_tree_nodes_to_process.empty())
|
|
{
|
|
auto join_tree_node_to_process = join_tree_nodes_to_process.back();
|
|
join_tree_nodes_to_process.pop_back();
|
|
|
|
auto join_tree_node_type = join_tree_node_to_process->getNodeType();
|
|
|
|
switch (join_tree_node_type)
|
|
{
|
|
case QueryTreeNodeType::TABLE:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::TABLE_FUNCTION:
|
|
{
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::QUERY:
|
|
{
|
|
auto & query_node_to_process = join_tree_node_to_process->as<QueryNode &>();
|
|
if (query_node_to_process.isGroupByWithTotals())
|
|
return true;
|
|
|
|
join_tree_nodes_to_process.push_back(query_node_to_process.getJoinTree());
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::UNION:
|
|
{
|
|
auto & union_node = join_tree_node_to_process->as<UnionNode &>();
|
|
auto & union_queries = union_node.getQueries().getNodes();
|
|
|
|
for (auto & union_query : union_queries)
|
|
join_tree_nodes_to_process.push_back(union_query);
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::ARRAY_JOIN:
|
|
{
|
|
auto & array_join_node = join_tree_node_to_process->as<ArrayJoinNode &>();
|
|
join_tree_nodes_to_process.push_back(array_join_node.getTableExpression());
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::JOIN:
|
|
{
|
|
auto & join_node = join_tree_node_to_process->as<JoinNode &>();
|
|
join_tree_nodes_to_process.push_back(join_node.getLeftTableExpression());
|
|
join_tree_nodes_to_process.push_back(join_node.getRightTableExpression());
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
"Unexpected node type for table expression. "
|
|
"Expected table, table function, query, union, join or array join. Actual {}",
|
|
join_tree_node_to_process->getNodeTypeName());
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context)
|
|
{
|
|
auto function_node = std::make_shared<FunctionNode>("and");
|
|
auto and_function = FunctionFactory::instance().get("and", context);
|
|
function_node->getArguments().getNodes() = condition_nodes;
|
|
function_node->resolveAsFunction(and_function->build(function_node->getArgumentColumns()));
|
|
|
|
return function_node;
|
|
}
|
|
|
|
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(
|
|
const QueryTreeNodePtr & query_node,
|
|
const QueryTreeNodes & table_nodes,
|
|
const ContextPtr & context,
|
|
ResultReplacementMap * result_replacement_map)
|
|
{
|
|
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
|
|
|
|
for (const auto & table_expression : table_nodes)
|
|
{
|
|
auto * table_node = table_expression->as<TableNode>();
|
|
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
|
|
|
if (table_node || table_function_node)
|
|
{
|
|
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
|
|
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
|
|
|
|
StoragePtr storage_dummy = std::make_shared<StorageDummy>(
|
|
storage_snapshot->storage.getStorageID(),
|
|
ColumnsDescription(storage_snapshot->getColumns(get_column_options)),
|
|
storage_snapshot);
|
|
|
|
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
|
|
|
|
if (result_replacement_map)
|
|
result_replacement_map->emplace(table_expression, dummy_table_node);
|
|
|
|
dummy_table_node->setAlias(table_expression->getAlias());
|
|
replacement_map.emplace(table_expression.get(), std::move(dummy_table_node));
|
|
}
|
|
}
|
|
|
|
return query_node->cloneAndReplace(replacement_map);
|
|
}
|
|
|
|
SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
|
|
{
|
|
SelectQueryInfo select_query_info;
|
|
select_query_info.query = queryNodeToSelectQuery(query_tree);
|
|
select_query_info.query_tree = query_tree;
|
|
select_query_info.planner_context = planner_context;
|
|
return select_query_info;
|
|
}
|
|
|
|
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
|
|
const QueryTreeNodePtr & table_expression,
|
|
PlannerContextPtr & planner_context,
|
|
NameSet table_expression_required_names_without_filter)
|
|
{
|
|
const auto & query_context = planner_context->getQueryContext();
|
|
auto filter_query_tree = buildQueryTree(filter_expression, query_context);
|
|
|
|
QueryAnalysisPass query_analysis_pass(table_expression);
|
|
query_analysis_pass.run(filter_query_tree, query_context);
|
|
|
|
return buildFilterInfo(std::move(filter_query_tree), table_expression, planner_context, std::move(table_expression_required_names_without_filter));
|
|
}
|
|
|
|
FilterDAGInfo buildFilterInfo(QueryTreeNodePtr filter_query_tree,
|
|
const QueryTreeNodePtr & table_expression,
|
|
PlannerContextPtr & planner_context,
|
|
NameSet table_expression_required_names_without_filter)
|
|
{
|
|
if (table_expression_required_names_without_filter.empty())
|
|
{
|
|
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
|
|
const auto & table_expression_names = table_expression_data.getColumnNames();
|
|
table_expression_required_names_without_filter.insert(table_expression_names.begin(), table_expression_names.end());
|
|
}
|
|
|
|
collectSourceColumns(filter_query_tree, planner_context, false /*keep_alias_columns*/);
|
|
collectSets(filter_query_tree, *planner_context);
|
|
|
|
auto filter_actions_dag = std::make_unique<ActionsDAG>();
|
|
|
|
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
|
|
auto expression_nodes = actions_visitor.visit(*filter_actions_dag, filter_query_tree);
|
|
if (expression_nodes.size() != 1)
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
|
"Filter actions must return single output node. Actual {}",
|
|
expression_nodes.size());
|
|
|
|
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
|
|
filter_actions_outputs = std::move(expression_nodes);
|
|
|
|
std::string filter_node_name = filter_actions_outputs[0]->result_name;
|
|
bool remove_filter_column = true;
|
|
|
|
for (const auto & filter_input_node : filter_actions_dag->getInputs())
|
|
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
|
|
filter_actions_outputs.push_back(filter_input_node);
|
|
|
|
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
|
|
}
|
|
|
|
ASTPtr parseAdditionalResultFilter(const Settings & settings)
|
|
{
|
|
const String & additional_result_filter = settings.additional_result_filter;
|
|
if (additional_result_filter.empty())
|
|
return {};
|
|
|
|
ParserExpression parser;
|
|
auto additional_result_filter_ast = parseQuery(
|
|
parser, additional_result_filter.data(), additional_result_filter.data() + additional_result_filter.size(),
|
|
"additional result filter", settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
|
|
return additional_result_filter_ast;
|
|
}
|
|
|
|
void appendSetsFromActionsDAG(const ActionsDAG & dag, UsefulSets & useful_sets)
|
|
{
|
|
for (const auto & node : dag.getNodes())
|
|
{
|
|
if (node.column)
|
|
{
|
|
const IColumn * column = node.column.get();
|
|
if (const auto * column_const = typeid_cast<const ColumnConst *>(column))
|
|
column = &column_const->getDataColumn();
|
|
|
|
if (const auto * column_set = typeid_cast<const ColumnSet *>(column))
|
|
useful_sets.insert(column_set->getData());
|
|
}
|
|
|
|
if (node.type == ActionsDAG::ActionType::FUNCTION && node.function_base->getName() == "indexHint")
|
|
{
|
|
ActionsDAG::NodeRawConstPtrs children;
|
|
if (const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(node.function_base.get()))
|
|
{
|
|
if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get()))
|
|
{
|
|
appendSetsFromActionsDAG(*index_hint->getActions(), useful_sets);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|