mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Analyzer support distributed JOINS and subqueries in IN functions
This commit is contained in:
parent
8e4112b1ce
commit
a762112e15
@ -2,18 +2,21 @@
|
||||
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/FieldVisitorToString.h>
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <Analyzer/ConstantNode.h>
|
||||
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
#include <DataTypes/IDataType.h>
|
||||
#include <DataTypes/DataTypeSet.h>
|
||||
|
||||
#include <Parsers/ASTFunction.h>
|
||||
|
||||
#include <Functions/IFunction.h>
|
||||
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
#include <Analyzer/Utils.h>
|
||||
#include <Analyzer/ConstantNode.h>
|
||||
#include <Analyzer/IdentifierNode.h>
|
||||
|
||||
namespace DB
|
||||
@ -44,17 +47,28 @@ const DataTypes & FunctionNode::getArgumentTypes() const
|
||||
ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
|
||||
{
|
||||
const auto & arguments = getArguments().getNodes();
|
||||
size_t arguments_size = arguments.size();
|
||||
|
||||
ColumnsWithTypeAndName argument_columns;
|
||||
argument_columns.reserve(arguments.size());
|
||||
|
||||
for (const auto & arg : arguments)
|
||||
for (size_t i = 0; i < arguments_size; ++i)
|
||||
{
|
||||
ColumnWithTypeAndName argument;
|
||||
argument.type = arg->getResultType();
|
||||
if (auto * constant = arg->as<ConstantNode>())
|
||||
argument.column = argument.type->createColumnConst(1, constant->getValue());
|
||||
argument_columns.push_back(std::move(argument));
|
||||
const auto & argument = arguments[i];
|
||||
|
||||
ColumnWithTypeAndName argument_column;
|
||||
|
||||
if (isNameOfInFunction(function_name) && i == 1)
|
||||
argument_column.type = std::make_shared<DataTypeSet>();
|
||||
else
|
||||
argument_column.type = argument->getResultType();
|
||||
|
||||
if (auto * constant = argument->as<ConstantNode>())
|
||||
argument_column.column = argument_column.type->createColumnConst(1, constant->getValue());
|
||||
|
||||
argument_columns.push_back(std::move(argument_column));
|
||||
}
|
||||
|
||||
return argument_columns;
|
||||
}
|
||||
|
||||
|
@ -99,8 +99,9 @@ class InDepthQueryTreeVisitorWithContext
|
||||
public:
|
||||
using VisitQueryTreeNodeType = std::conditional_t<const_visitor, const QueryTreeNodePtr, QueryTreeNodePtr>;
|
||||
|
||||
explicit InDepthQueryTreeVisitorWithContext(ContextPtr context)
|
||||
explicit InDepthQueryTreeVisitorWithContext(ContextPtr context, size_t initial_subquery_depth = 0)
|
||||
: current_context(std::move(context))
|
||||
, subquery_depth(initial_subquery_depth)
|
||||
{}
|
||||
|
||||
/// Return true if visitor should traverse tree top to bottom, false otherwise
|
||||
@ -125,11 +126,17 @@ public:
|
||||
return current_context->getSettingsRef();
|
||||
}
|
||||
|
||||
size_t getSubqueryDepth() const
|
||||
{
|
||||
return subquery_depth;
|
||||
}
|
||||
|
||||
void visit(VisitQueryTreeNodeType & query_tree_node)
|
||||
{
|
||||
auto current_scope_context_ptr = current_context;
|
||||
SCOPE_EXIT(
|
||||
current_context = std::move(current_scope_context_ptr);
|
||||
--subquery_depth;
|
||||
);
|
||||
|
||||
if (auto * query_node = query_tree_node->template as<QueryNode>())
|
||||
@ -137,6 +144,8 @@ public:
|
||||
else if (auto * union_node = query_tree_node->template as<UnionNode>())
|
||||
current_context = union_node->getContext();
|
||||
|
||||
++subquery_depth;
|
||||
|
||||
bool traverse_top_to_bottom = getDerived().shouldTraverseTopToBottom();
|
||||
if (!traverse_top_to_bottom)
|
||||
visitChildren(query_tree_node);
|
||||
@ -145,7 +154,12 @@ public:
|
||||
|
||||
if (traverse_top_to_bottom)
|
||||
visitChildren(query_tree_node);
|
||||
|
||||
getDerived().leaveImpl(query_tree_node);
|
||||
}
|
||||
|
||||
void leaveImpl(VisitQueryTreeNodeType & node [[maybe_unused]])
|
||||
{}
|
||||
private:
|
||||
Derived & getDerived()
|
||||
{
|
||||
@ -172,6 +186,7 @@ private:
|
||||
}
|
||||
|
||||
ContextPtr current_context;
|
||||
size_t subquery_depth = 0;
|
||||
};
|
||||
|
||||
template <typename Derived>
|
||||
|
@ -106,6 +106,12 @@ public:
|
||||
return locality;
|
||||
}
|
||||
|
||||
/// Set join locality
|
||||
void setLocality(JoinLocality locality_value)
|
||||
{
|
||||
locality = locality_value;
|
||||
}
|
||||
|
||||
/// Get join strictness
|
||||
JoinStrictness getStrictness() const
|
||||
{
|
||||
|
@ -42,7 +42,7 @@ private:
|
||||
return;
|
||||
|
||||
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
|
||||
bool is_final_supported = storage && storage->supportsFinal() && !storage->isRemote();
|
||||
bool is_final_supported = storage && storage->supportsFinal();
|
||||
if (!is_final_supported)
|
||||
return;
|
||||
|
||||
|
@ -2914,7 +2914,10 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromTableExpression(const Id
|
||||
break;
|
||||
|
||||
IdentifierLookup column_identifier_lookup = {qualified_identifier_with_removed_part, IdentifierLookupContext::EXPRESSION};
|
||||
if (tryBindIdentifierToAliases(column_identifier_lookup, scope) ||
|
||||
if (tryBindIdentifierToAliases(column_identifier_lookup, scope))
|
||||
break;
|
||||
|
||||
if (table_expression_data.should_qualify_columns &&
|
||||
tryBindIdentifierToTableExpressions(column_identifier_lookup, table_expression_node, scope))
|
||||
break;
|
||||
|
||||
|
@ -79,6 +79,56 @@ bool isNameOfInFunction(const std::string & function_name)
|
||||
return is_special_function_in;
|
||||
}
|
||||
|
||||
bool isNameOfLocalInFunction(const std::string & function_name)
|
||||
{
|
||||
bool is_special_function_in = function_name == "in" ||
|
||||
function_name == "notIn" ||
|
||||
function_name == "nullIn" ||
|
||||
function_name == "notNullIn" ||
|
||||
function_name == "inIgnoreSet" ||
|
||||
function_name == "notInIgnoreSet" ||
|
||||
function_name == "nullInIgnoreSet" ||
|
||||
function_name == "notNullInIgnoreSet";
|
||||
|
||||
return is_special_function_in;
|
||||
}
|
||||
|
||||
bool isNameOfGlobalInFunction(const std::string & function_name)
|
||||
{
|
||||
bool is_special_function_in = function_name == "globalIn" ||
|
||||
function_name == "globalNotIn" ||
|
||||
function_name == "globalNullIn" ||
|
||||
function_name == "globalNotNullIn" ||
|
||||
function_name == "globalInIgnoreSet" ||
|
||||
function_name == "globalNotInIgnoreSet" ||
|
||||
function_name == "globalNullInIgnoreSet" ||
|
||||
function_name == "globalNotNullInIgnoreSet";
|
||||
|
||||
return is_special_function_in;
|
||||
}
|
||||
|
||||
std::string getGlobalInFunctionNameForLocalInFunctionName(const std::string & function_name)
|
||||
{
|
||||
if (function_name == "in")
|
||||
return "globalIn";
|
||||
else if (function_name == "notIn")
|
||||
return "globalNotIn";
|
||||
else if (function_name == "nullIn")
|
||||
return "globalNullIn";
|
||||
else if (function_name == "notNullIn")
|
||||
return "globalNotNullIn";
|
||||
else if (function_name == "inIgnoreSet")
|
||||
return "globalInIgnoreSet";
|
||||
else if (function_name == "notInIgnoreSet")
|
||||
return "globalNotInIgnoreSet";
|
||||
else if (function_name == "nullInIgnoreSet")
|
||||
return "globalNullInIgnoreSet";
|
||||
else if (function_name == "notNullInIgnoreSet")
|
||||
return "globalNotNullInIgnoreSet";
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid local IN function name {}", function_name);
|
||||
}
|
||||
|
||||
QueryTreeNodePtr buildCastFunction(const QueryTreeNodePtr & expression,
|
||||
const DataTypePtr & type,
|
||||
const ContextPtr & context,
|
||||
|
@ -13,6 +13,15 @@ bool isNodePartOfTree(const IQueryTreeNode * node, const IQueryTreeNode * root);
|
||||
/// Returns true if function name is name of IN function or its variations, false otherwise
|
||||
bool isNameOfInFunction(const std::string & function_name);
|
||||
|
||||
/// Returns true if function name is name of local IN function or its variations, false otherwise
|
||||
bool isNameOfLocalInFunction(const std::string & function_name);
|
||||
|
||||
/// Returns true if function name is name of global IN function or its variations, false otherwise
|
||||
bool isNameOfGlobalInFunction(const std::string & function_name);
|
||||
|
||||
/// Returns global in function name for local in function name
|
||||
std::string getGlobalInFunctionNameForLocalInFunctionName(const std::string & function_name);
|
||||
|
||||
/** Build cast function that cast expression into type.
|
||||
* If resolve = true, then result cast function is resolved during build, otherwise
|
||||
* result cast function is not resolved during build.
|
||||
|
@ -45,7 +45,7 @@ bool GlobalPlannerContext::hasColumnIdentifier(const ColumnIdentifier & column_i
|
||||
return column_identifiers.contains(column_identifier);
|
||||
}
|
||||
|
||||
PlannerContext::PlannerContext(ContextPtr query_context_, GlobalPlannerContextPtr global_planner_context_)
|
||||
PlannerContext::PlannerContext(ContextMutablePtr query_context_, GlobalPlannerContextPtr global_planner_context_)
|
||||
: query_context(std::move(query_context_))
|
||||
, global_planner_context(std::move(global_planner_context_))
|
||||
{}
|
||||
|
@ -88,16 +88,22 @@ class PlannerContext
|
||||
{
|
||||
public:
|
||||
/// Create planner context with query context and global planner context
|
||||
PlannerContext(ContextPtr query_context_, GlobalPlannerContextPtr global_planner_context_);
|
||||
PlannerContext(ContextMutablePtr query_context_, GlobalPlannerContextPtr global_planner_context_);
|
||||
|
||||
/// Get planner context query context
|
||||
const ContextPtr & getQueryContext() const
|
||||
ContextPtr getQueryContext() const
|
||||
{
|
||||
return query_context;
|
||||
}
|
||||
|
||||
/// Get planner context query context
|
||||
ContextPtr & getQueryContext()
|
||||
/// Get planner context mutable query context
|
||||
const ContextMutablePtr & getMutableQueryContext() const
|
||||
{
|
||||
return query_context;
|
||||
}
|
||||
|
||||
/// Get planner context mutable query context
|
||||
ContextMutablePtr & getMutableQueryContext()
|
||||
{
|
||||
return query_context;
|
||||
}
|
||||
@ -137,12 +143,18 @@ public:
|
||||
*/
|
||||
TableExpressionData * getTableExpressionDataOrNull(const QueryTreeNodePtr & table_expression_node);
|
||||
|
||||
/// Get table expression node to data read only map
|
||||
/// Get table expression node to data map
|
||||
const std::unordered_map<QueryTreeNodePtr, TableExpressionData> & getTableExpressionNodeToData() const
|
||||
{
|
||||
return table_expression_node_to_data;
|
||||
}
|
||||
|
||||
/// Get table expression node to data map
|
||||
std::unordered_map<QueryTreeNodePtr, TableExpressionData> & getTableExpressionNodeToData()
|
||||
{
|
||||
return table_expression_node_to_data;
|
||||
}
|
||||
|
||||
/** Get column node identifier.
|
||||
* For column node source check if table expression data is registered.
|
||||
* If table expression data is not registered exception is thrown.
|
||||
@ -184,7 +196,7 @@ public:
|
||||
|
||||
private:
|
||||
/// Query context
|
||||
ContextPtr query_context;
|
||||
ContextMutablePtr query_context;
|
||||
|
||||
/// Global planner context
|
||||
GlobalPlannerContextPtr global_planner_context;
|
||||
|
@ -246,17 +246,87 @@ bool applyTrivialCountIfPossible(
|
||||
return true;
|
||||
}
|
||||
|
||||
JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expression,
|
||||
const SelectQueryInfo & select_query_info,
|
||||
const SelectQueryOptions & select_query_options,
|
||||
PlannerContextPtr & planner_context,
|
||||
bool is_single_table_expression)
|
||||
void prepareBuildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expression, PlannerContextPtr & planner_context)
|
||||
{
|
||||
const auto & query_context = planner_context->getQueryContext();
|
||||
const auto & settings = query_context->getSettingsRef();
|
||||
|
||||
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
|
||||
auto columns_names = table_expression_data.getColumnNames();
|
||||
|
||||
auto * table_node = table_expression->as<TableNode>();
|
||||
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
||||
auto * query_node = table_expression->as<QueryNode>();
|
||||
auto * union_node = table_expression->as<UnionNode>();
|
||||
|
||||
/** The current user must have the SELECT privilege.
|
||||
* We do not check access rights for table functions because they have been already checked in ITableFunction::execute().
|
||||
*/
|
||||
if (table_node)
|
||||
{
|
||||
auto column_names_with_aliases = columns_names;
|
||||
const auto & alias_columns_names = table_expression_data.getAliasColumnsNames();
|
||||
column_names_with_aliases.insert(column_names_with_aliases.end(), alias_columns_names.begin(), alias_columns_names.end());
|
||||
checkAccessRights(*table_node, column_names_with_aliases, query_context);
|
||||
}
|
||||
|
||||
if (columns_names.empty())
|
||||
{
|
||||
NameAndTypePair additional_column_to_read;
|
||||
|
||||
if (table_node || table_function_node)
|
||||
{
|
||||
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
|
||||
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
|
||||
additional_column_to_read = chooseSmallestColumnToReadFromStorage(storage, storage_snapshot);
|
||||
|
||||
}
|
||||
else if (query_node || union_node)
|
||||
{
|
||||
const auto & projection_columns = query_node ? query_node->getProjectionColumns() : union_node->computeProjectionColumns();
|
||||
NamesAndTypesList projection_columns_list(projection_columns.begin(), projection_columns.end());
|
||||
additional_column_to_read = ExpressionActions::getSmallestColumn(projection_columns_list);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected table, table function, query or union. Actual {}",
|
||||
table_expression->formatASTForErrorMessage());
|
||||
}
|
||||
|
||||
auto & global_planner_context = planner_context->getGlobalPlannerContext();
|
||||
const auto & column_identifier = global_planner_context->createColumnIdentifier(additional_column_to_read, table_expression);
|
||||
columns_names.push_back(additional_column_to_read.name);
|
||||
table_expression_data.addColumn(additional_column_to_read, column_identifier);
|
||||
}
|
||||
|
||||
/// Limitation on the number of columns to read
|
||||
if (settings.max_columns_to_read && columns_names.size() > settings.max_columns_to_read)
|
||||
throw Exception(ErrorCodes::TOO_MANY_COLUMNS,
|
||||
"Limit for number of columns to read exceeded. Requested: {}, maximum: {}",
|
||||
columns_names.size(),
|
||||
settings.max_columns_to_read);
|
||||
}
|
||||
|
||||
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
|
||||
const SelectQueryInfo & select_query_info,
|
||||
const SelectQueryOptions & select_query_options,
|
||||
PlannerContextPtr & planner_context,
|
||||
bool is_single_table_expression,
|
||||
bool wrap_read_columns_in_subquery)
|
||||
{
|
||||
const auto & query_context = planner_context->getQueryContext();
|
||||
const auto & settings = query_context->getSettingsRef();
|
||||
|
||||
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
|
||||
|
||||
QueryProcessingStage::Enum from_stage = QueryProcessingStage::Enum::FetchColumns;
|
||||
|
||||
if (wrap_read_columns_in_subquery)
|
||||
{
|
||||
auto columns = table_expression_data.getColumns();
|
||||
table_expression = buildSubqueryToReadColumnsFromTableExpression(columns, table_expression, query_context);
|
||||
}
|
||||
|
||||
auto * table_node = table_expression->as<TableNode>();
|
||||
auto * table_function_node = table_expression->as<TableFunctionNode>();
|
||||
auto * query_node = table_expression->as<QueryNode>();
|
||||
@ -264,8 +334,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
|
||||
|
||||
QueryPlan query_plan;
|
||||
|
||||
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
|
||||
|
||||
if (table_node || table_function_node)
|
||||
{
|
||||
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
|
||||
@ -362,32 +430,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
|
||||
|
||||
auto columns_names = table_expression_data.getColumnNames();
|
||||
|
||||
/** The current user must have the SELECT privilege.
|
||||
* We do not check access rights for table functions because they have been already checked in ITableFunction::execute().
|
||||
*/
|
||||
if (table_node)
|
||||
{
|
||||
auto column_names_with_aliases = columns_names;
|
||||
const auto & alias_columns_names = table_expression_data.getAliasColumnsNames();
|
||||
column_names_with_aliases.insert(column_names_with_aliases.end(), alias_columns_names.begin(), alias_columns_names.end());
|
||||
checkAccessRights(*table_node, column_names_with_aliases, planner_context->getQueryContext());
|
||||
}
|
||||
|
||||
/// Limitation on the number of columns to read
|
||||
if (settings.max_columns_to_read && columns_names.size() > settings.max_columns_to_read)
|
||||
throw Exception(ErrorCodes::TOO_MANY_COLUMNS,
|
||||
"Limit for number of columns to read exceeded. Requested: {}, maximum: {}",
|
||||
columns_names.size(),
|
||||
settings.max_columns_to_read);
|
||||
|
||||
if (columns_names.empty())
|
||||
{
|
||||
auto additional_column_to_read = chooseSmallestColumnToReadFromStorage(storage, storage_snapshot);
|
||||
const auto & column_identifier = planner_context->getGlobalPlannerContext()->createColumnIdentifier(additional_column_to_read, table_expression);
|
||||
columns_names.push_back(additional_column_to_read.name);
|
||||
table_expression_data.addColumn(additional_column_to_read, column_identifier);
|
||||
}
|
||||
|
||||
bool need_rewrite_query_with_final = storage->needRewriteQueryWithFinal(columns_names);
|
||||
if (need_rewrite_query_with_final)
|
||||
{
|
||||
@ -464,16 +506,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
|
||||
}
|
||||
else
|
||||
{
|
||||
if (table_expression_data.getColumnNames().empty())
|
||||
{
|
||||
const auto & projection_columns = query_node ? query_node->getProjectionColumns() : union_node->computeProjectionColumns();
|
||||
NamesAndTypesList projection_columns_list(projection_columns.begin(), projection_columns.end());
|
||||
auto additional_column_to_read = ExpressionActions::getSmallestColumn(projection_columns_list);
|
||||
|
||||
const auto & column_identifier = planner_context->getGlobalPlannerContext()->createColumnIdentifier(additional_column_to_read, table_expression);
|
||||
table_expression_data.addColumn(additional_column_to_read, column_identifier);
|
||||
}
|
||||
|
||||
auto subquery_options = select_query_options.subquery();
|
||||
Planner subquery_planner(table_expression, subquery_options, planner_context->getGlobalPlannerContext());
|
||||
/// Propagate storage limits to subquery
|
||||
@ -516,10 +548,11 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
|
||||
planner.buildQueryPlanIfNeeded();
|
||||
|
||||
auto expected_header = planner.getQueryPlan().getCurrentDataStream().header;
|
||||
materializeBlockInplace(expected_header);
|
||||
|
||||
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, expected_header))
|
||||
{
|
||||
materializeBlockInplace(expected_header);
|
||||
|
||||
auto rename_actions_dag = ActionsDAG::makeConvertingActions(
|
||||
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
|
||||
expected_header.getColumnsWithTypeAndName(),
|
||||
@ -1059,14 +1092,40 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
|
||||
const ColumnIdentifierSet & outer_scope_columns,
|
||||
PlannerContextPtr & planner_context)
|
||||
{
|
||||
const auto & query_node_typed = query_node->as<QueryNode &>();
|
||||
auto table_expressions_stack = buildTableExpressionsStack(query_node_typed.getJoinTree());
|
||||
auto table_expressions_stack = buildTableExpressionsStack(query_node->as<QueryNode &>().getJoinTree());
|
||||
size_t table_expressions_stack_size = table_expressions_stack.size();
|
||||
bool is_single_table_expression = table_expressions_stack_size == 1;
|
||||
|
||||
std::vector<ColumnIdentifierSet> table_expressions_outer_scope_columns(table_expressions_stack_size);
|
||||
ColumnIdentifierSet current_outer_scope_columns = outer_scope_columns;
|
||||
|
||||
/// For each table, table function, query, union table expressions prepare before query plan build
|
||||
for (size_t i = 0; i < table_expressions_stack_size; ++i)
|
||||
{
|
||||
const auto & table_expression = table_expressions_stack[i];
|
||||
auto table_expression_type = table_expression->getNodeType();
|
||||
if (table_expression_type == QueryTreeNodeType::JOIN ||
|
||||
table_expression_type == QueryTreeNodeType::ARRAY_JOIN)
|
||||
continue;
|
||||
|
||||
prepareBuildQueryPlanForTableExpression(table_expression, planner_context);
|
||||
}
|
||||
|
||||
/** If left most table expression query plan is planned to stage that is not equal to fetch columns,
|
||||
* then left most table expression is responsible for providing valid JOIN TREE part of final query plan.
|
||||
*
|
||||
* Examples: Distributed, LiveView, Merge storages.
|
||||
*/
|
||||
auto left_table_expression = table_expressions_stack.front();
|
||||
auto left_table_expression_query_plan = buildQueryPlanForTableExpression(left_table_expression,
|
||||
select_query_info,
|
||||
select_query_options,
|
||||
planner_context,
|
||||
is_single_table_expression,
|
||||
false /*wrap_read_columns_in_subquery*/);
|
||||
if (left_table_expression_query_plan.from_stage != QueryProcessingStage::FetchColumns)
|
||||
return left_table_expression_query_plan;
|
||||
|
||||
for (Int64 i = static_cast<Int64>(table_expressions_stack_size) - 1; i >= 0; --i)
|
||||
{
|
||||
table_expressions_outer_scope_columns[i] = current_outer_scope_columns;
|
||||
@ -1120,19 +1179,22 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
|
||||
if (table_expression_data.isRemote() && i != 0)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"JOIN with multiple remote storages is unsupported");
|
||||
if (table_expression == left_table_expression)
|
||||
{
|
||||
query_plans_stack.push_back(std::move(left_table_expression_query_plan));
|
||||
continue;
|
||||
}
|
||||
|
||||
/** If table expression is remote and it is not left most table expression, we wrap read columns from such
|
||||
* table expression in subquery.
|
||||
*/
|
||||
bool is_remote = planner_context->getTableExpressionDataOrThrow(table_expression).isRemote();
|
||||
query_plans_stack.push_back(buildQueryPlanForTableExpression(table_expression,
|
||||
select_query_info,
|
||||
select_query_options,
|
||||
planner_context,
|
||||
is_single_table_expression));
|
||||
|
||||
if (query_plans_stack.back().from_stage != QueryProcessingStage::FetchColumns)
|
||||
break;
|
||||
is_single_table_expression,
|
||||
is_remote /*wrap_read_columns_in_subquery*/));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,6 +101,17 @@ public:
|
||||
return column_names;
|
||||
}
|
||||
|
||||
NamesAndTypes getColumns() const
|
||||
{
|
||||
NamesAndTypes result;
|
||||
result.reserve(column_names.size());
|
||||
|
||||
for (const auto & column_name : column_names)
|
||||
result.push_back(column_name_to_column.at(column_name));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
ColumnIdentifiers getColumnIdentifiers() const
|
||||
{
|
||||
ColumnIdentifiers result;
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
|
||||
@ -19,6 +20,7 @@
|
||||
|
||||
#include <Analyzer/Utils.h>
|
||||
#include <Analyzer/ConstantNode.h>
|
||||
#include <Analyzer/ColumnNode.h>
|
||||
#include <Analyzer/FunctionNode.h>
|
||||
#include <Analyzer/QueryNode.h>
|
||||
#include <Analyzer/UnionNode.h>
|
||||
@ -391,4 +393,36 @@ QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNo
|
||||
return query_node->cloneAndReplace(replacement_map);
|
||||
}
|
||||
|
||||
QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTypes & columns,
|
||||
const QueryTreeNodePtr & table_expression,
|
||||
const ContextPtr & context)
|
||||
{
|
||||
auto projection_columns = columns;
|
||||
|
||||
QueryTreeNodes subquery_projection_nodes;
|
||||
subquery_projection_nodes.reserve(projection_columns.size());
|
||||
|
||||
for (const auto & column : projection_columns)
|
||||
subquery_projection_nodes.push_back(std::make_shared<ColumnNode>(column, table_expression));
|
||||
|
||||
if (subquery_projection_nodes.empty())
|
||||
{
|
||||
auto constant_data_type = std::make_shared<DataTypeUInt64>();
|
||||
subquery_projection_nodes.push_back(std::make_shared<ConstantNode>(1UL, constant_data_type));
|
||||
projection_columns.push_back({"1", std::move(constant_data_type)});
|
||||
}
|
||||
|
||||
auto context_copy = Context::createCopy(context);
|
||||
updateContextForSubqueryExecution(context_copy);
|
||||
|
||||
auto query_node = std::make_shared<QueryNode>(std::move(context_copy));
|
||||
|
||||
query_node->resolveProjectionColumns(projection_columns);
|
||||
query_node->getProjection().getNodes() = std::move(subquery_projection_nodes);
|
||||
query_node->getJoinTree() = table_expression;
|
||||
query_node->setIsSubquery(true);
|
||||
|
||||
return query_node;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -72,4 +72,9 @@ QueryTreeNodePtr replaceTablesAndTableFunctionsWithDummyTables(const QueryTreeNo
|
||||
const ContextPtr & context,
|
||||
ResultReplacementMap * result_replacement_map = nullptr);
|
||||
|
||||
/// Build subquery to read specified columns from table expression
|
||||
QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTypes & columns,
|
||||
const QueryTreeNodePtr & table_expression,
|
||||
const ContextPtr & context);
|
||||
|
||||
}
|
||||
|
@ -39,11 +39,16 @@
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/IAST.h>
|
||||
|
||||
#include <Analyzer/Utils.h>
|
||||
#include <Analyzer/ColumnNode.h>
|
||||
#include <Analyzer/FunctionNode.h>
|
||||
#include <Analyzer/TableNode.h>
|
||||
#include <Analyzer/TableFunctionNode.h>
|
||||
#include <Analyzer/QueryNode.h>
|
||||
#include <Analyzer/JoinNode.h>
|
||||
#include <Analyzer/QueryTreeBuilder.h>
|
||||
#include <Analyzer/Passes/QueryAnalysisPass.h>
|
||||
#include <Analyzer/InDepthQueryTreeVisitor.h>
|
||||
|
||||
#include <Planner/Planner.h>
|
||||
#include <Planner/Utils.h>
|
||||
@ -55,6 +60,7 @@
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/InterpreterDescribeQuery.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/JoinedTables.h>
|
||||
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
|
||||
@ -69,12 +75,14 @@
|
||||
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
|
||||
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionView.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
|
||||
#include <Storages/IStorageCluster.h>
|
||||
|
||||
#include <Processors/Executors/PushingPipelineExecutor.h>
|
||||
#include <Processors/Executors/CompletedPipelineExecutor.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
@ -138,6 +146,7 @@ namespace ErrorCodes
|
||||
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
|
||||
extern const int DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED;
|
||||
}
|
||||
|
||||
namespace ActionLocks
|
||||
@ -634,12 +643,265 @@ StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery(
|
||||
namespace
|
||||
{
|
||||
|
||||
QueryTreeNodePtr buildQueryTreeDistributedTableReplacedWithLocalTable(const SelectQueryInfo & query_info,
|
||||
/// Visitor that collect column source to columns mapping from query and all subqueries
|
||||
class CollectColumnSourceToColumnsVisitor : public InDepthQueryTreeVisitor<CollectColumnSourceToColumnsVisitor>
|
||||
{
|
||||
public:
|
||||
struct Columns
|
||||
{
|
||||
NameSet column_names;
|
||||
NamesAndTypes columns;
|
||||
|
||||
void addColumn(NameAndTypePair column)
|
||||
{
|
||||
if (column_names.contains(column.name))
|
||||
return;
|
||||
|
||||
column_names.insert(column.name);
|
||||
columns.push_back(std::move(column));
|
||||
}
|
||||
};
|
||||
|
||||
const std::unordered_map<QueryTreeNodePtr, Columns> & getColumnSourceToColumns() const
|
||||
{
|
||||
return column_source_to_columns;
|
||||
}
|
||||
|
||||
void visitImpl(QueryTreeNodePtr & node)
|
||||
{
|
||||
auto * column_node = node->as<ColumnNode>();
|
||||
if (!column_node)
|
||||
return;
|
||||
|
||||
auto column_source = column_node->getColumnSourceOrNull();
|
||||
if (!column_source)
|
||||
return;
|
||||
|
||||
auto it = column_source_to_columns.find(column_source);
|
||||
if (it == column_source_to_columns.end())
|
||||
{
|
||||
auto [insert_it, _] = column_source_to_columns.emplace(column_source, Columns());
|
||||
it = insert_it;
|
||||
}
|
||||
|
||||
it->second.addColumn(column_node->getColumn());
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<QueryTreeNodePtr, Columns> column_source_to_columns;
|
||||
};
|
||||
|
||||
/** Visitor that rewrites IN and JOINs in query and all subqueries according to distributed_product_mode and
|
||||
* prefer_global_in_and_join settings.
|
||||
*
|
||||
* Additionally collects GLOBAL JOIN and GLOBAL IN query nodes.
|
||||
*
|
||||
* If distributed_product_mode = deny, then visitor throws exception if there are multiple distributed tables.
|
||||
* If distributed_product_mode = local, then visitor collects replacement map for tables that must be replaced
|
||||
* with local tables.
|
||||
* If distributed_product_mode = global or prefer_global_in_and_join setting is true, then visitor rewrites JOINs and IN functions that
|
||||
* contain distributed tables to GLOBAL JOINs and GLOBAL IN functions.
|
||||
* If distributed_product_mode = allow, then visitor does not rewrite query if there are multiple distributed tables.
|
||||
*/
|
||||
class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>
|
||||
{
|
||||
public:
|
||||
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
|
||||
using Base::Base;
|
||||
|
||||
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
|
||||
: Base(context_)
|
||||
{}
|
||||
|
||||
struct InFunctionOrJoin
|
||||
{
|
||||
QueryTreeNodePtr query_node;
|
||||
size_t subquery_depth = 0;
|
||||
};
|
||||
|
||||
const std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> & getReplacementMap() const
|
||||
{
|
||||
return replacement_map;
|
||||
}
|
||||
|
||||
const std::vector<InFunctionOrJoin> & getGlobalInOrJoinNodes() const
|
||||
{
|
||||
return global_in_or_join_nodes;
|
||||
}
|
||||
|
||||
bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
|
||||
{
|
||||
auto * function_node = parent->as<FunctionNode>();
|
||||
if (function_node && isNameOfGlobalInFunction(function_node->getFunctionName()))
|
||||
return false;
|
||||
|
||||
auto * join_node = parent->as<JoinNode>();
|
||||
if (join_node && join_node->getLocality() == JoinLocality::Global && join_node->getRightTableExpression() == child)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void visitImpl(QueryTreeNodePtr & node)
|
||||
{
|
||||
auto * function_node = node->as<FunctionNode>();
|
||||
auto * join_node = node->as<JoinNode>();
|
||||
|
||||
if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
|
||||
(join_node && join_node->getLocality() == JoinLocality::Global))
|
||||
{
|
||||
InFunctionOrJoin in_function_or_join_entry;
|
||||
in_function_or_join_entry.query_node = node;
|
||||
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
|
||||
global_in_or_join_nodes.push_back(std::move(in_function_or_join_entry));
|
||||
return;
|
||||
}
|
||||
|
||||
if ((function_node && isNameOfLocalInFunction(function_node->getFunctionName())) ||
|
||||
(join_node && join_node->getLocality() != JoinLocality::Global))
|
||||
{
|
||||
InFunctionOrJoin in_function_or_join_entry;
|
||||
in_function_or_join_entry.query_node = node;
|
||||
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
|
||||
in_function_or_join_stack.push_back(in_function_or_join_entry);
|
||||
return;
|
||||
}
|
||||
|
||||
if (node->getNodeType() == QueryTreeNodeType::TABLE)
|
||||
tryRewriteTableNodeIfNeeded(node);
|
||||
}
|
||||
|
||||
void leaveImpl(QueryTreeNodePtr & node)
|
||||
{
|
||||
if (!in_function_or_join_stack.empty() && node.get() == in_function_or_join_stack.back().query_node.get())
|
||||
in_function_or_join_stack.pop_back();
|
||||
}
|
||||
|
||||
private:
|
||||
void tryRewriteTableNodeIfNeeded(const QueryTreeNodePtr & table_node)
|
||||
{
|
||||
const auto & table_node_typed = table_node->as<TableNode &>();
|
||||
const auto * distributed_storage = typeid_cast<const StorageDistributed *>(table_node_typed.getStorage().get());
|
||||
if (!distributed_storage)
|
||||
return;
|
||||
|
||||
bool distributed_valid_for_rewrite = distributed_storage->getShardCount() >= 2;
|
||||
if (!distributed_valid_for_rewrite)
|
||||
return;
|
||||
|
||||
auto distributed_product_mode = getSettings().distributed_product_mode;
|
||||
|
||||
if (distributed_product_mode == DistributedProductMode::LOCAL)
|
||||
{
|
||||
StorageID remote_storage_id = StorageID{distributed_storage->getRemoteDatabaseName(),
|
||||
distributed_storage->getRemoteTableName()};
|
||||
auto resolved_remote_storage_id = getContext()->resolveStorageID(remote_storage_id);
|
||||
const auto & distributed_storage_columns = table_node_typed.getStorageSnapshot()->metadata->getColumns();
|
||||
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_columns);
|
||||
auto replacement_table_expression = std::make_shared<TableNode>(std::move(storage), getContext());
|
||||
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
|
||||
}
|
||||
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings().prefer_global_in_and_join) &&
|
||||
!in_function_or_join_stack.empty())
|
||||
{
|
||||
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
|
||||
|
||||
if (auto * in_function_to_modify = in_or_join_node_to_modify->as<FunctionNode>())
|
||||
{
|
||||
auto global_in_function_name = getGlobalInFunctionNameForLocalInFunctionName(in_function_to_modify->getFunctionName());
|
||||
auto global_in_function_resolver = FunctionFactory::instance().get(global_in_function_name, getContext());
|
||||
in_function_to_modify->resolveAsFunction(global_in_function_resolver->build(in_function_to_modify->getArgumentColumns()));
|
||||
}
|
||||
else if (auto * join_node_to_modify = in_or_join_node_to_modify->as<JoinNode>())
|
||||
{
|
||||
join_node_to_modify->setLocality(JoinLocality::Global);
|
||||
}
|
||||
|
||||
global_in_or_join_nodes.push_back(in_function_or_join_stack.back());
|
||||
}
|
||||
else if (distributed_product_mode == DistributedProductMode::ALLOW)
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if (distributed_product_mode == DistributedProductMode::DENY)
|
||||
{
|
||||
throw Exception(ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED,
|
||||
"Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny'). "
|
||||
"You may rewrite query to use local tables "
|
||||
"in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.");
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<InFunctionOrJoin> in_function_or_join_stack;
|
||||
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
|
||||
std::vector<InFunctionOrJoin> global_in_or_join_nodes;
|
||||
};
|
||||
|
||||
/** Execute subquery node and put result in mutable context temporary table.
|
||||
* Returns table node that is initialized with temporary table storage.
|
||||
*/
|
||||
QueryTreeNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
|
||||
ContextMutablePtr & mutable_context,
|
||||
size_t subquery_depth)
|
||||
{
|
||||
auto subquery_hash = subquery_node->getTreeHash();
|
||||
String temporary_table_name = fmt::format("_data_{}_{}", subquery_hash.first, subquery_hash.second);
|
||||
|
||||
const auto & external_tables = mutable_context->getExternalTables();
|
||||
auto external_table_it = external_tables.find(temporary_table_name);
|
||||
if (external_table_it != external_tables.end())
|
||||
{
|
||||
auto temporary_table_expression_node = std::make_shared<TableNode>(external_table_it->second, mutable_context);
|
||||
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
|
||||
return temporary_table_expression_node;
|
||||
}
|
||||
|
||||
auto subquery_ast = subquery_node->toAST();
|
||||
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth, true /*is_subquery*/);
|
||||
auto context_copy = Context::createCopy(mutable_context);
|
||||
updateContextForSubqueryExecution(context_copy);
|
||||
|
||||
InterpreterSelectQueryAnalyzer interpreter(subquery_ast, context_copy, subquery_options);
|
||||
|
||||
Block sample = interpreter.getSampleBlock();
|
||||
NamesAndTypesList columns = sample.getNamesAndTypesList();
|
||||
|
||||
auto external_storage_holder = TemporaryTableHolder(
|
||||
mutable_context,
|
||||
ColumnsDescription{columns},
|
||||
ConstraintsDescription{},
|
||||
nullptr /*query*/,
|
||||
true /*create_for_global_subquery*/);
|
||||
|
||||
StoragePtr external_storage = external_storage_holder.getTable();
|
||||
auto temporary_table_expression_node = std::make_shared<TableNode>(external_storage, mutable_context);
|
||||
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
|
||||
|
||||
auto table_out = external_storage->write({}, external_storage->getInMemoryMetadataPtr(), mutable_context);
|
||||
auto io = interpreter.execute();
|
||||
io.pipeline.complete(std::move(table_out));
|
||||
CompletedPipelineExecutor executor(io.pipeline);
|
||||
executor.execute();
|
||||
|
||||
mutable_context->addExternalTable(temporary_table_name, std::move(external_storage_holder));
|
||||
|
||||
return temporary_table_expression_node;
|
||||
}
|
||||
|
||||
QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
|
||||
const StorageSnapshotPtr & distributed_storage_snapshot,
|
||||
const StorageID & remote_storage_id,
|
||||
const ASTPtr & remote_table_function)
|
||||
{
|
||||
const auto & query_context = query_info.planner_context->getQueryContext();
|
||||
auto & planner_context = query_info.planner_context;
|
||||
const auto & query_context = planner_context->getQueryContext();
|
||||
|
||||
std::optional<TableExpressionModifiers> table_expression_modifiers;
|
||||
|
||||
if (auto * query_info_table_node = query_info.table_expression->as<TableNode>())
|
||||
table_expression_modifiers = query_info_table_node->getTableExpressionModifiers();
|
||||
else if (auto * query_info_table_function_node = query_info.table_expression->as<TableFunctionNode>())
|
||||
table_expression_modifiers = query_info_table_function_node->getTableExpressionModifiers();
|
||||
|
||||
QueryTreeNodePtr replacement_table_expression;
|
||||
|
||||
@ -651,6 +913,9 @@ QueryTreeNodePtr buildQueryTreeDistributedTableReplacedWithLocalTable(const Sele
|
||||
auto table_function_node = std::make_shared<TableFunctionNode>(remote_table_function_node.getFunctionName());
|
||||
table_function_node->getArgumentsNode() = remote_table_function_node.getArgumentsNode();
|
||||
|
||||
if (table_expression_modifiers)
|
||||
table_function_node->setTableExpressionModifiers(*table_expression_modifiers);
|
||||
|
||||
QueryAnalysisPass query_analysis_pass;
|
||||
query_analysis_pass.run(table_function_node, query_context);
|
||||
|
||||
@ -660,13 +925,85 @@ QueryTreeNodePtr buildQueryTreeDistributedTableReplacedWithLocalTable(const Sele
|
||||
{
|
||||
auto resolved_remote_storage_id = query_context->resolveStorageID(remote_storage_id);
|
||||
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_snapshot->metadata->getColumns());
|
||||
auto table_node = std::make_shared<TableNode>(std::move(storage), query_context);
|
||||
|
||||
replacement_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
|
||||
if (table_expression_modifiers)
|
||||
table_node->setTableExpressionModifiers(*table_expression_modifiers);
|
||||
|
||||
replacement_table_expression = std::move(table_node);
|
||||
}
|
||||
|
||||
replacement_table_expression->setAlias(query_info.table_expression->getAlias());
|
||||
|
||||
return query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression));
|
||||
auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression));
|
||||
|
||||
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
|
||||
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);
|
||||
|
||||
const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();
|
||||
|
||||
DistributedProductModeRewriteInJoinVisitor visitor(query_info.planner_context->getQueryContext());
|
||||
visitor.visit(query_tree_to_modify);
|
||||
|
||||
auto replacement_map = visitor.getReplacementMap();
|
||||
const auto & global_in_or_join_nodes = visitor.getGlobalInOrJoinNodes();
|
||||
|
||||
for (const auto & global_in_or_join_node : global_in_or_join_nodes)
|
||||
{
|
||||
if (auto * join_node = global_in_or_join_node.query_node->as<JoinNode>())
|
||||
{
|
||||
auto join_right_table_expression = join_node->getRightTableExpression();
|
||||
auto join_right_table_expression_node_type = join_right_table_expression->getNodeType();
|
||||
|
||||
QueryTreeNodePtr subquery_node;
|
||||
|
||||
if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY ||
|
||||
join_right_table_expression_node_type == QueryTreeNodeType::UNION)
|
||||
{
|
||||
subquery_node = join_right_table_expression;
|
||||
}
|
||||
else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE ||
|
||||
join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
|
||||
{
|
||||
const auto & columns = column_source_to_columns.at(join_right_table_expression).columns;
|
||||
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns,
|
||||
join_right_table_expression,
|
||||
planner_context->getQueryContext());
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Expected JOIN righ table expression to be table, table function, query or union node. Actual {}",
|
||||
join_right_table_expression->formatASTForErrorMessage());
|
||||
}
|
||||
|
||||
auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
|
||||
planner_context->getMutableQueryContext(),
|
||||
global_in_or_join_node.subquery_depth);
|
||||
temporary_table_expression_node->setAlias(join_right_table_expression->getAlias());
|
||||
replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node));
|
||||
continue;
|
||||
}
|
||||
else if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
|
||||
{
|
||||
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
|
||||
auto temporary_table_expression_node = executeSubqueryNode(in_function_subquery_node,
|
||||
planner_context->getMutableQueryContext(),
|
||||
global_in_or_join_node.subquery_depth);
|
||||
in_function_subquery_node = std::move(temporary_table_expression_node);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Expected global IN or JOIN query node. Actual {}",
|
||||
global_in_or_join_node.query_node->formatASTForErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (!replacement_map.empty())
|
||||
query_tree_to_modify = query_tree_to_modify->cloneAndReplace(replacement_map);
|
||||
|
||||
return query_tree_to_modify;
|
||||
}
|
||||
|
||||
}
|
||||
@ -694,17 +1031,13 @@ void StorageDistributed::read(
|
||||
if (!remote_table_function_ptr)
|
||||
remote_storage_id = StorageID{remote_database, remote_table};
|
||||
|
||||
auto query_tree_with_replaced_distributed_table = buildQueryTreeDistributedTableReplacedWithLocalTable(query_info,
|
||||
auto query_tree_distributed = buildQueryTreeDistributed(query_info,
|
||||
storage_snapshot,
|
||||
remote_storage_id,
|
||||
remote_table_function_ptr);
|
||||
|
||||
query_ast = queryNodeToSelectQuery(query_tree_with_replaced_distributed_table);
|
||||
|
||||
Planner planner(query_tree_with_replaced_distributed_table, SelectQueryOptions(processed_stage).analyze());
|
||||
planner.buildQueryPlanIfNeeded();
|
||||
|
||||
header = planner.getQueryPlan().getCurrentDataStream().header;
|
||||
query_ast = queryNodeToSelectQuery(query_tree_distributed);
|
||||
header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_ast, local_context, SelectQueryOptions(processed_stage).analyze());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <Interpreters/TransactionLog.h>
|
||||
#include <Interpreters/ClusterProxy/executeQuery.h>
|
||||
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
||||
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <Parsers/ASTCheckQuery.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
@ -223,8 +224,12 @@ void StorageMergeTree::read(
|
||||
|
||||
auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
|
||||
|
||||
Block header =
|
||||
InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
Block header;
|
||||
|
||||
if (local_context->getSettingsRef().allow_experimental_analyzer)
|
||||
header = InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze());
|
||||
else
|
||||
header = InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
|
||||
ClusterProxy::SelectStreamFactory select_stream_factory =
|
||||
ClusterProxy::SelectStreamFactory(
|
||||
|
@ -167,7 +167,7 @@ void StorageView::read(
|
||||
query_plan.addStep(std::move(materializing));
|
||||
|
||||
/// And also convert to expected structure.
|
||||
const auto & expected_header = storage_snapshot->getSampleBlockForColumns(column_names,parameter_values);
|
||||
const auto & expected_header = storage_snapshot->getSampleBlockForColumns(column_names, parameter_values);
|
||||
const auto & header = query_plan.getCurrentDataStream().header;
|
||||
|
||||
const auto * select_with_union = current_inner_query->as<ASTSelectWithUnionQuery>();
|
||||
|
@ -1 +1,3 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
|
@ -1,3 +1,4 @@
|
||||
set allow_experimental_analyzer = 1;
|
||||
set distributed_product_mode = 'local';
|
||||
|
||||
drop table if exists shard1;
|
||||
@ -21,7 +22,7 @@ where distr1.id in
|
||||
from distr1
|
||||
join distr2 on distr1.id = distr2.id
|
||||
where distr1.id > 0
|
||||
); -- { serverError 288 }
|
||||
);
|
||||
|
||||
select distinct(d0.id) from distr1 d0
|
||||
where d0.id in
|
||||
@ -32,15 +33,14 @@ where d0.id in
|
||||
where d1.id > 0
|
||||
);
|
||||
|
||||
-- TODO
|
||||
--select distinct(distr1.id) from distr1
|
||||
--where distr1.id in
|
||||
--(
|
||||
-- select distr1.id
|
||||
-- from distr1 as d1
|
||||
-- join distr2 as d2 on distr1.id = distr2.id
|
||||
-- where distr1.id > 0
|
||||
--);
|
||||
select distinct(distr1.id) from distr1
|
||||
where distr1.id in
|
||||
(
|
||||
select distr1.id
|
||||
from distr1 as d1
|
||||
join distr2 as d2 on distr1.id = distr2.id
|
||||
where distr1.id > 0
|
||||
);
|
||||
|
||||
drop table shard1;
|
||||
drop table shard2;
|
||||
|
@ -108,9 +108,6 @@ select left_table.id,val_left, val_middle, val_right from left_table
|
||||
ORDER BY left_table.id, val_left, val_middle, val_right;
|
||||
1 c a c
|
||||
1 c b c
|
||||
-- no distributed tests because it is not currently supported:
|
||||
-- JOIN with remote storages is unsupported.
|
||||
|
||||
-- Quite exotic with Merge engine
|
||||
DROP TABLE IF EXISTS table_to_merge_a;
|
||||
DROP TABLE IF EXISTS table_to_merge_b;
|
||||
|
@ -79,9 +79,6 @@ select left_table.id,val_left, val_middle, val_right from left_table
|
||||
inner join (SELECT * FROM right_table WHERE id = 1) r on middle_table.id = r.id
|
||||
ORDER BY left_table.id, val_left, val_middle, val_right;
|
||||
|
||||
-- no distributed tests because it is not currently supported:
|
||||
-- JOIN with remote storages is unsupported.
|
||||
|
||||
-- Quite exotic with Merge engine
|
||||
DROP TABLE IF EXISTS table_to_merge_a;
|
||||
DROP TABLE IF EXISTS table_to_merge_b;
|
||||
|
Loading…
Reference in New Issue
Block a user