mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
522 lines
20 KiB
C++
522 lines
20 KiB
C++
#include <Planner/Utils.h>
|
|
|
|
#include <Parsers/ASTSelectWithUnionQuery.h>
|
|
#include <Parsers/ASTSelectQuery.h>
|
|
#include <Parsers/ASTSubquery.h>
|
|
#include <Parsers/ExpressionListParsers.h>
|
|
#include <Parsers/parseQuery.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
#include <DataTypes/DataTypeLowCardinality.h>
|
|
#include <DataTypes/DataTypeNullable.h>
|
|
|
|
#include <Columns/getLeastSuperColumn.h>
|
|
|
|
#include <IO/WriteBufferFromString.h>
|
|
|
|
#include <Functions/FunctionFactory.h>
|
|
|
|
#include <Storages/StorageDummy.h>
|
|
|
|
#include <Interpreters/Context.h>
|
|
|
|
#include <Analyzer/Utils.h>
|
|
#include <Analyzer/ConstantNode.h>
|
|
#include <Analyzer/ColumnNode.h>
|
|
#include <Analyzer/FunctionNode.h>
|
|
#include <Analyzer/QueryNode.h>
|
|
#include <Analyzer/UnionNode.h>
|
|
#include <Analyzer/TableNode.h>
|
|
#include <Analyzer/TableFunctionNode.h>
|
|
#include <Analyzer/ArrayJoinNode.h>
|
|
#include <Analyzer/JoinNode.h>
|
|
#include <Analyzer/QueryTreeBuilder.h>
|
|
#include <Analyzer/Passes/QueryAnalysisPass.h>
|
|
|
|
#include <Planner/PlannerActionsVisitor.h>
|
|
#include <Planner/CollectTableExpressionData.h>
|
|
#include <Planner/CollectSets.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int BAD_ARGUMENTS;
|
|
extern const int LOGICAL_ERROR;
|
|
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
|
|
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
|
|
}
|
|
|
|
String dumpQueryPlan(QueryPlan & query_plan)
|
|
{
|
|
WriteBufferFromOwnString query_plan_buffer;
|
|
query_plan.explainPlan(query_plan_buffer, QueryPlan::ExplainPlanOptions{true, true, true, true});
|
|
|
|
return query_plan_buffer.str();
|
|
}
|
|
|
|
String dumpQueryPipeline(QueryPlan & query_plan)
|
|
{
|
|
QueryPlan::ExplainPipelineOptions explain_pipeline;
|
|
WriteBufferFromOwnString query_pipeline_buffer;
|
|
query_plan.explainPipeline(query_pipeline_buffer, explain_pipeline);
|
|
|
|
return query_pipeline_buffer.str();
|
|
}
|
|
|
|
Block buildCommonHeaderForUnion(const Blocks & queries_headers, SelectUnionMode union_mode)
|
|
{
|
|
size_t num_selects = queries_headers.size();
|
|
Block common_header = queries_headers.front();
|
|
size_t columns_size = common_header.columns();
|
|
|
|
for (size_t query_number = 1; query_number < num_selects; ++query_number)
|
|
{
|
|
int error_code = 0;
|
|
|
|
if (union_mode == SelectUnionMode::UNION_DEFAULT ||
|
|
union_mode == SelectUnionMode::UNION_ALL ||
|
|
union_mode == SelectUnionMode::UNION_DISTINCT)
|
|
error_code = ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH;
|
|
else
|
|
error_code = ErrorCodes::INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
|
|
|
|
if (queries_headers.at(query_number).columns() != columns_size)
|
|
throw Exception(error_code,
|
|
"Different number of columns in {} elements: {} and {}",
|
|
toString(union_mode),
|
|
common_header.dumpNames(),
|
|
queries_headers[query_number].dumpNames());
|
|
}
|
|
|
|
std::vector<const ColumnWithTypeAndName *> columns(num_selects);
|
|
|
|
for (size_t column_number = 0; column_number < columns_size; ++column_number)
|
|
{
|
|
for (size_t i = 0; i < num_selects; ++i)
|
|
columns[i] = &queries_headers[i].getByPosition(column_number);
|
|
|
|
ColumnWithTypeAndName & result_element = common_header.getByPosition(column_number);
|
|
result_element = getLeastSuperColumn(columns);
|
|
}
|
|
|
|
return common_header;
|
|
}
|
|
|
|
ASTPtr queryNodeToSelectQuery(const QueryTreeNodePtr & query_node)
|
|
{
|
|
auto & query_node_typed = query_node->as<QueryNode &>();
|
|
|
|
// In case of cross-replication we don't know what database is used for the table.
|
|
// Each shard will use the default database (in the case of cross-replication shards may have different defaults).
|
|
auto result_ast = query_node_typed.toAST({ .qualify_indentifiers_with_database = false });
|
|
|
|
while (true)
|
|
{
|
|
if (auto * select_query = result_ast->as<ASTSelectQuery>())
|
|
break;
|
|
else if (auto * select_with_union = result_ast->as<ASTSelectWithUnionQuery>())
|
|
result_ast = select_with_union->list_of_selects->children.at(0);
|
|
else if (auto * subquery = result_ast->as<ASTSubquery>())
|
|
result_ast = subquery->children.at(0);
|
|
else
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query node invalid conversion to select query");
|
|
}
|
|
|
|
if (result_ast == nullptr)
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query node invalid conversion to select query");
|
|
|
|
return result_ast;
|
|
}
|
|
|
|
/** There are no limits on the maximum size of the result for the subquery.
|
|
* Since the result of the query is not the result of the entire query.
|
|
*/
|
|
void updateContextForSubqueryExecution(ContextMutablePtr & mutable_context)
|
|
{
|
|
/** The subquery in the IN / JOIN section does not have any restrictions on the maximum size of the result.
|
|
* Because the result of this query is not the result of the entire query.
|
|
* Constraints work instead
|
|
* max_rows_in_set, max_bytes_in_set, set_overflow_mode,
|
|
* max_rows_in_join, max_bytes_in_join, join_overflow_mode,
|
|
* which are checked separately (in the Set, Join objects).
|
|
*/
|
|
Settings subquery_settings = mutable_context->getSettings();
|
|
subquery_settings.max_result_rows = 0;
|
|
subquery_settings.max_result_bytes = 0;
|
|
/// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query).
|
|
subquery_settings.extremes = false;
|
|
mutable_context->setSettings(subquery_settings);
|
|
}
|
|
|
|
namespace
|
|
{
|
|
|
|
StreamLocalLimits getLimitsForStorage(const Settings & settings, const SelectQueryOptions & options)
|
|
{
|
|
StreamLocalLimits limits;
|
|
limits.mode = LimitsMode::LIMITS_TOTAL;
|
|
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
|
|
limits.speed_limits.max_execution_time = settings.max_execution_time;
|
|
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
|
|
|
|
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
|
|
* because the initiating server has a summary of the execution of the request on all servers.
|
|
*
|
|
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
|
|
* additionally on each remote server, because these limits are checked per block of data processed,
|
|
* and remote servers may process way more blocks of data than are received by initiator.
|
|
*
|
|
* The limits to throttle maximum execution speed is also checked on all servers.
|
|
*/
|
|
if (options.to_stage == QueryProcessingStage::Complete)
|
|
{
|
|
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
|
|
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
|
|
}
|
|
|
|
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
|
|
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
|
|
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
|
|
|
|
return limits;
|
|
}
|
|
|
|
}
|
|
|
|
StorageLimits buildStorageLimits(const Context & context, const SelectQueryOptions & options)
|
|
{
|
|
const auto & settings = context.getSettingsRef();
|
|
|
|
StreamLocalLimits limits;
|
|
SizeLimits leaf_limits;
|
|
|
|
/// Set the limits and quota for reading data, the speed and time of the query.
|
|
if (!options.ignore_limits)
|
|
{
|
|
limits = getLimitsForStorage(settings, options);
|
|
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf, settings.read_overflow_mode_leaf);
|
|
}
|
|
|
|
return {limits, leaf_limits};
|
|
}
|
|
|
|
ActionsDAGPtr buildActionsDAGFromExpressionNode(const QueryTreeNodePtr & expression_node,
|
|
const ColumnsWithTypeAndName & input_columns,
|
|
const PlannerContextPtr & planner_context)
|
|
{
|
|
ActionsDAGPtr action_dag = std::make_shared<ActionsDAG>(input_columns);
|
|
PlannerActionsVisitor actions_visitor(planner_context);
|
|
auto expression_dag_index_nodes = actions_visitor.visit(action_dag, expression_node);
|
|
action_dag->getOutputs() = std::move(expression_dag_index_nodes);
|
|
|
|
return action_dag;
|
|
}
|
|
|
|
bool sortDescriptionIsPrefix(const SortDescription & prefix, const SortDescription & full)
|
|
{
|
|
size_t prefix_size = prefix.size();
|
|
if (prefix_size > full.size())
|
|
return false;
|
|
|
|
for (size_t i = 0; i < prefix_size; ++i)
|
|
{
|
|
if (full[i] != prefix[i])
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool queryHasArrayJoinInJoinTree(const QueryTreeNodePtr & query_node)
|
|
{
|
|
const auto & query_node_typed = query_node->as<const QueryNode &>();
|
|
|
|
std::vector<QueryTreeNodePtr> join_tree_nodes_to_process;
|
|
join_tree_nodes_to_process.push_back(query_node_typed.getJoinTree());
|
|
|
|
while (!join_tree_nodes_to_process.empty())
|
|
{
|
|
auto join_tree_node_to_process = join_tree_nodes_to_process.back();
|
|
join_tree_nodes_to_process.pop_back();
|
|
|
|
auto join_tree_node_type = join_tree_node_to_process->getNodeType();
|
|
|
|
switch (join_tree_node_type)
|
|
{
|
|
case QueryTreeNodeType::TABLE:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::QUERY:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::UNION:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::TABLE_FUNCTION:
|
|
{
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::ARRAY_JOIN:
|
|
{
|
|
return true;
|
|
}
|
|
case QueryTreeNodeType::JOIN:
|
|
{
|
|
auto & join_node = join_tree_node_to_process->as<JoinNode &>();
|
|
join_tree_nodes_to_process.push_back(join_node.getLeftTableExpression());
|
|
join_tree_nodes_to_process.push_back(join_node.getRightTableExpression());
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
"Unexpected node type for table expression. "
|
|
"Expected table, table function, query, union, join or array join. Actual {}",
|
|
join_tree_node_to_process->getNodeTypeName());
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool queryHasWithTotalsInAnySubqueryInJoinTree(const QueryTreeNodePtr & query_node)
|
|
{
|
|
const auto & query_node_typed = query_node->as<const QueryNode &>();
|
|
|
|
std::vector<QueryTreeNodePtr> join_tree_nodes_to_process;
|
|
join_tree_nodes_to_process.push_back(query_node_typed.getJoinTree());
|
|
|
|
while (!join_tree_nodes_to_process.empty())
|
|
{
|
|
auto join_tree_node_to_process = join_tree_nodes_to_process.back();
|
|
join_tree_nodes_to_process.pop_back();
|
|
|
|
auto join_tree_node_type = join_tree_node_to_process->getNodeType();
|
|
|
|
switch (join_tree_node_type)
|
|
{
|
|
case QueryTreeNodeType::TABLE:
|
|
[[fallthrough]];
|
|
case QueryTreeNodeType::TABLE_FUNCTION:
|
|
{
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::QUERY:
|
|
{
|
|
auto & query_node_to_process = join_tree_node_to_process->as<QueryNode &>();
|
|
if (query_node_to_process.isGroupByWithTotals())
|
|
return true;
|
|
|
|
join_tree_nodes_to_process.push_back(query_node_to_process.getJoinTree());
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::UNION:
|
|
{
|
|
auto & union_node = join_tree_node_to_process->as<UnionNode &>();
|
|
auto & union_queries = union_node.getQueries().getNodes();
|
|
|
|
for (auto & union_query : union_queries)
|
|
join_tree_nodes_to_process.push_back(union_query);
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::ARRAY_JOIN:
|
|
{
|
|
auto & array_join_node = join_tree_node_to_process->as<ArrayJoinNode &>();
|
|
join_tree_nodes_to_process.push_back(array_join_node.getTableExpression());
|
|
break;
|
|
}
|
|
case QueryTreeNodeType::JOIN:
|
|
{
|
|
auto & join_node = join_tree_node_to_process->as<JoinNode &>();
|
|
join_tree_nodes_to_process.push_back(join_node.getLeftTableExpression());
|
|
join_tree_nodes_to_process.push_back(join_node.getRightTableExpression());
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
|
"Unexpected node type for table expression. "
|
|
"Expected table, table function, query, union, join or array join. Actual {}",
|
|
join_tree_node_to_process->getNodeTypeName());
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
QueryTreeNodePtr mergeConditionNodes(const QueryTreeNodes & condition_nodes, const ContextPtr & context)
|
|
{
|
|
auto function_node = std::make_shared<FunctionNode>("and");
|
|
auto and_function = FunctionFactory::instance().get("and", context);
|
|
function_node->getArguments().getNodes() = condition_nodes;
|
|
function_node->resolveAsFunction(and_function->build(function_node->getArgumentColumns()));
|
|
|
|
return function_node;
|
|
}
|
|
|
|
QueryTreeNodePtr replaceTableExpressionsWithDummyTables(const QueryTreeNodePtr & query_node,
|
|
const ContextPtr & context,
|
|
//PlannerContext & planner_context,
|
|
ResultReplacementMap * result_replacement_map)
|
|
{
|
|
auto & query_node_typed = query_node->as<QueryNode &>();
|
|
auto table_expressions = extractTableExpressions(query_node_typed.getJoinTree());
|
|
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
|
|
size_t subquery_index = 0;
|
|
|
|
for (auto & table_expression : table_expressions)
|
|
{
|
|
auto * table_node = table_expression->as<TableNode>();
|
|
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
|
auto * subquery_node = table_expression->as<QueryNode>();
|
|
auto * union_node = table_expression->as<UnionNode>();
|
|
|
|
StoragePtr storage_dummy;
|
|
|
|
if (table_node || table_function_node)
|
|
{
|
|
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
|
|
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
|
|
|
|
storage_dummy
|
|
= std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(), ColumnsDescription(storage_snapshot->getColumns(get_column_options)));
|
|
}
|
|
else if (subquery_node || union_node)
|
|
{
|
|
const auto & subquery_projection_columns
|
|
= subquery_node ? subquery_node->getProjectionColumns() : union_node->computeProjectionColumns();
|
|
|
|
NameSet unique_column_names;
|
|
NamesAndTypes storage_dummy_columns;
|
|
storage_dummy_columns.reserve(subquery_projection_columns.size());
|
|
|
|
for (const auto & projection_column : subquery_projection_columns)
|
|
{
|
|
auto [_, inserted] = unique_column_names.insert(projection_column.name);
|
|
if (inserted)
|
|
storage_dummy_columns.emplace_back(projection_column);
|
|
}
|
|
|
|
storage_dummy = std::make_shared<StorageDummy>(StorageID{"dummy", "subquery_" + std::to_string(subquery_index)}, ColumnsDescription::fromNamesAndTypes(storage_dummy_columns));
|
|
++subquery_index;
|
|
}
|
|
|
|
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
|
|
|
|
if (result_replacement_map)
|
|
result_replacement_map->emplace(table_expression, dummy_table_node);
|
|
|
|
dummy_table_node->setAlias(table_expression->getAlias());
|
|
|
|
// auto & src_table_expression_data = planner_context.getOrCreateTableExpressionData(table_expression);
|
|
// auto & dst_table_expression_data = planner_context.getOrCreateTableExpressionData(dummy_table_node);
|
|
|
|
// dst_table_expression_data = src_table_expression_data;
|
|
|
|
replacement_map.emplace(table_expression.get(), std::move(dummy_table_node));
|
|
}
|
|
|
|
return query_node->cloneAndReplace(replacement_map);
|
|
}
|
|
|
|
QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTypes & columns,
|
|
const QueryTreeNodePtr & table_expression,
|
|
const ContextPtr & context)
|
|
{
|
|
auto projection_columns = columns;
|
|
|
|
QueryTreeNodes subquery_projection_nodes;
|
|
subquery_projection_nodes.reserve(projection_columns.size());
|
|
|
|
for (const auto & column : projection_columns)
|
|
subquery_projection_nodes.push_back(std::make_shared<ColumnNode>(column, table_expression));
|
|
|
|
if (subquery_projection_nodes.empty())
|
|
{
|
|
auto constant_data_type = std::make_shared<DataTypeUInt64>();
|
|
subquery_projection_nodes.push_back(std::make_shared<ConstantNode>(1UL, constant_data_type));
|
|
projection_columns.push_back({"1", std::move(constant_data_type)});
|
|
}
|
|
|
|
auto context_copy = Context::createCopy(context);
|
|
updateContextForSubqueryExecution(context_copy);
|
|
|
|
auto query_node = std::make_shared<QueryNode>(std::move(context_copy));
|
|
|
|
query_node->getProjection().getNodes() = std::move(subquery_projection_nodes);
|
|
query_node->resolveProjectionColumns(projection_columns);
|
|
query_node->getJoinTree() = table_expression;
|
|
query_node->setIsSubquery(true);
|
|
|
|
return query_node;
|
|
}
|
|
|
|
SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context)
|
|
{
|
|
SelectQueryInfo select_query_info;
|
|
select_query_info.query = queryNodeToSelectQuery(query_tree);
|
|
select_query_info.query_tree = query_tree;
|
|
select_query_info.planner_context = planner_context;
|
|
return select_query_info;
|
|
}
|
|
|
|
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
|
|
const QueryTreeNodePtr & table_expression,
|
|
PlannerContextPtr & planner_context,
|
|
NameSet table_expression_required_names_without_filter)
|
|
{
|
|
const auto & query_context = planner_context->getQueryContext();
|
|
|
|
auto filter_query_tree = buildQueryTree(filter_expression, query_context);
|
|
|
|
QueryAnalysisPass query_analysis_pass(table_expression);
|
|
query_analysis_pass.run(filter_query_tree, query_context);
|
|
|
|
if (table_expression_required_names_without_filter.empty())
|
|
{
|
|
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
|
|
const auto & table_expression_names = table_expression_data.getColumnNames();
|
|
table_expression_required_names_without_filter.insert(table_expression_names.begin(), table_expression_names.end());
|
|
}
|
|
|
|
collectSourceColumns(filter_query_tree, planner_context);
|
|
collectSets(filter_query_tree, *planner_context);
|
|
|
|
auto filter_actions_dag = std::make_shared<ActionsDAG>();
|
|
|
|
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
|
|
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
|
|
if (expression_nodes.size() != 1)
|
|
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
|
"Filter actions must return single output node. Actual {}",
|
|
expression_nodes.size());
|
|
|
|
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
|
|
filter_actions_outputs = std::move(expression_nodes);
|
|
|
|
std::string filter_node_name = filter_actions_outputs[0]->result_name;
|
|
bool remove_filter_column = true;
|
|
|
|
for (const auto & filter_input_node : filter_actions_dag->getInputs())
|
|
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
|
|
filter_actions_outputs.push_back(filter_input_node);
|
|
|
|
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
|
|
}
|
|
|
|
ASTPtr parseAdditionalResultFilter(const Settings & settings)
|
|
{
|
|
const String & additional_result_filter = settings.additional_result_filter;
|
|
if (additional_result_filter.empty())
|
|
return {};
|
|
|
|
ParserExpression parser;
|
|
auto additional_result_filter_ast = parseQuery(
|
|
parser, additional_result_filter.data(), additional_result_filter.data() + additional_result_filter.size(),
|
|
"additional result filter", settings.max_query_size, settings.max_parser_depth);
|
|
return additional_result_filter_ast;
|
|
}
|
|
|
|
}
|