Analyzer storage Merge fixes

This commit is contained in:
Maksim Kita 2023-02-15 15:03:52 +01:00
parent b1ab2af7ad
commit 6b2adc1ec2
8 changed files with 132 additions and 124 deletions

View File

@ -125,7 +125,8 @@ public:
{
if (kind != FunctionKind::ORDINARY)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function node with name '{}' is not resolved as ordinary function");
"Function node with name '{}' is not resolved as ordinary function",
function_name);
return std::static_pointer_cast<const IFunctionBase>(function);
}

View File

@ -16,9 +16,11 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryTreePassManager.h>
#include <Analyzer/IdentifierNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/Utils.h>
#include <Interpreters/Context.h>
@ -108,6 +110,18 @@ void replaceStorageInQueryTree(QueryTreeNodePtr & query_tree, const ContextPtr &
}
auto replacement_table_expression = std::make_shared<TableNode>(storage, context);
std::optional<TableExpressionModifiers> table_expression_modifiers;
if (auto * table_node = table_expression_to_replace->as<TableNode>())
table_expression_modifiers = table_node->getTableExpressionModifiers();
else if (auto * table_function_node = table_expression_to_replace->as<TableFunctionNode>())
table_expression_modifiers = table_function_node->getTableExpressionModifiers();
else if (auto * identifier_node = table_expression_to_replace->as<IdentifierNode>())
table_expression_modifiers = identifier_node->getTableExpressionModifiers();
if (table_expression_modifiers)
replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers);
query_tree = query_tree->cloneAndReplace(table_expression_to_replace, std::move(replacement_table_expression));
}
@ -132,11 +146,6 @@ QueryTreeNodePtr buildQueryTreeAndRunPasses(const ASTPtr & query,
return query_tree;
}
PlannerConfiguration buildPlannerConfiguration(const SelectQueryOptions & select_query_options)
{
return {.only_analyze = select_query_options.only_analyze};
}
}
InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
@ -147,7 +156,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, nullptr /*storage*/))
, planner(query_tree, select_query_options, buildPlannerConfiguration(select_query_options))
, planner(query_tree, select_query_options)
{
}
@ -160,7 +169,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, storage_))
, planner(query_tree, select_query_options, buildPlannerConfiguration(select_query_options))
, planner(query_tree, select_query_options)
{
}
@ -172,7 +181,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(query_tree_)
, planner(query_tree, select_query_options, buildPlannerConfiguration(select_query_options))
, planner(query_tree, select_query_options)
{
}

View File

@ -979,34 +979,28 @@ PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node,
}
Planner::Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
PlannerConfiguration planner_configuration_)
const SelectQueryOptions & select_query_options_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)
, planner_context(buildPlannerContext(query_tree, select_query_options, std::make_shared<GlobalPlannerContext>()))
, planner_configuration(std::move(planner_configuration_))
{
}
Planner::Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
GlobalPlannerContextPtr global_planner_context_,
PlannerConfiguration planner_configuration_)
GlobalPlannerContextPtr global_planner_context_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)
, planner_context(buildPlannerContext(query_tree_, select_query_options, std::move(global_planner_context_)))
, planner_configuration(std::move(planner_configuration_))
{
}
Planner::Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
PlannerContextPtr planner_context_,
PlannerConfiguration planner_configuration_)
PlannerContextPtr planner_context_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)
, planner_context(std::move(planner_context_))
, planner_configuration(std::move(planner_configuration_))
{
}
@ -1015,7 +1009,7 @@ void Planner::buildQueryPlanIfNeeded()
if (query_plan.isInitialized())
return;
if (query_tree->as<UnionNode>())
if (query_tree->getNodeType() == QueryTreeNodeType::UNION)
buildPlanForUnionNode();
else
buildPlanForQueryNode();
@ -1174,7 +1168,7 @@ void Planner::buildPlanForQueryNode()
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
if (planner_configuration.only_analyze)
if (select_query_options.only_analyze)
{
Block join_tree_block;

View File

@ -16,30 +16,22 @@ using GlobalPlannerContextPtr = std::shared_ptr<GlobalPlannerContext>;
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
struct PlannerConfiguration
{
bool only_analyze = false;
};
class Planner
{
public:
/// Initialize planner with query tree after analysis phase
Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
PlannerConfiguration planner_configuration_ = {});
const SelectQueryOptions & select_query_options_);
/// Initialize planner with query tree after query analysis phase and global planner context
Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
GlobalPlannerContextPtr global_planner_context_,
PlannerConfiguration planner_configuration_ = {});
GlobalPlannerContextPtr global_planner_context_);
/// Initialize planner with query tree after query analysis phase and planner context
Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
PlannerContextPtr planner_context_,
PlannerConfiguration planner_configuration_ = {});
PlannerContextPtr planner_context_);
const QueryPlan & getQueryPlan() const
{
@ -69,7 +61,6 @@ private:
QueryPlan query_plan;
SelectQueryOptions select_query_options;
PlannerContextPtr planner_context;
PlannerConfiguration planner_configuration;
StorageLimitsList storage_limits;
};

View File

@ -370,9 +370,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(const QueryTreeNodePtr & tabl
else
{
Planner planner(select_query_info.query_tree,
SelectQueryOptions(from_stage),
select_query_info.planner_context,
PlannerConfiguration{.only_analyze = true});
SelectQueryOptions(from_stage).analyze(),
select_query_info.planner_context);
planner.buildQueryPlanIfNeeded();
auto expected_header = planner.getQueryPlan().getCurrentDataStream().header;

View File

@ -736,9 +736,7 @@ void StorageDistributed::read(
query_ast = queryNodeToSelectQuery(query_tree_with_replaced_distributed_table);
Planner planner(query_tree_with_replaced_distributed_table,
SelectQueryOptions(processed_stage),
PlannerConfiguration{.only_analyze = true});
Planner planner(query_tree_with_replaced_distributed_table, SelectQueryOptions(processed_stage).analyze());
planner.buildQueryPlanIfNeeded();
header = planner.getQueryPlan().getCurrentDataStream().header;

View File

@ -533,23 +533,34 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer
{
const auto & [database_name, storage, storage_lock, table_name] = storage_with_lock_and_name;
const StorageID current_storage_id = storage->getStorageID();
bool is_storage_merge_engine = storage->as<StorageMerge>();
SelectQueryInfo modified_query_info = query_info;
if (modified_query_info.table_expression)
{
auto replacement_table_expression = std::make_shared<TableNode>(storage, storage_lock, storage_snapshot);
if (query_info.table_expression_modifiers)
replacement_table_expression->setTableExpressionModifiers(*query_info.table_expression_modifiers);
modified_query_info.query_tree = modified_query_info.query_tree->cloneAndReplace(modified_query_info.table_expression,
replacement_table_expression);
modified_query_info.table_expression = replacement_table_expression;
modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression);
if (!is_storage_merge_engine)
{
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
if (storage_snapshot->storage.supportsSubcolumns())
get_column_options.withSubcolumns();
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
if (!storage_snapshot->tryGetColumn(get_column_options, "_table"))
column_name_to_node.emplace("_table", std::make_shared<ConstantNode>(current_storage_id.table_name));
if (!storage_snapshot->tryGetColumn(get_column_options, "_database"))
column_name_to_node.emplace("_database", std::make_shared<ConstantNode>(current_storage_id.database_name));
if (!column_name_to_node.empty())
{
replaceColumns(modified_query_info.query_tree,
replacement_table_expression,
column_name_to_node);
@ -559,6 +570,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer
}
else
{
bool is_storage_merge_engine = storage->as<StorageMerge>();
modified_query_info.query = query_info.query->clone();
/// Original query could contain JOIN but we need only the first joined table and its columns.
@ -585,7 +597,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
const Block & header,
const Aliases & aliases,
const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Names real_column_names,
ContextMutablePtr modified_context,
size_t streams_num,
bool concat_streams)
@ -604,13 +616,82 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
modified_select.setFinal();
}
if (modified_context->getSettingsRef().allow_experimental_analyzer)
bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer;
auto storage_stage = storage->getQueryProcessingStage(modified_context,
QueryProcessingStage::Complete,
storage_snapshot,
modified_query_info);
if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns))
{
InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections(),
storage);
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
QueryPlan plan;
StorageView * view = dynamic_cast<StorageView *>(storage.get());
if (!view || allow_experimental_analyzer)
{
storage->read(plan,
real_column_names,
storage_snapshot,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
UInt32(streams_num));
}
else
{
/// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read.
/// The most intuitive way is to use InterpreterSelectQuery.
/// Intercept the settings
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
modified_context->setSetting("max_block_size", max_block_size);
InterpreterSelectQuery interpreter(modified_query_info.query,
modified_context,
storage,
view->getInMemoryMetadataPtr(),
SelectQueryOptions(processed_stage));
interpreter.buildQueryPlan(plan);
}
if (!plan.isInitialized())
return {};
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
read_from_merge_tree->addFilterNodes(added_filter_nodes);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(modified_context),
BuildQueryPipelineSettings::fromContext(modified_context));
}
else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
{
/// Maximum permissible parallelism is streams_num
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
if (allow_experimental_analyzer)
{
InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections());
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
}
else
{
modified_select.replaceDatabaseAndTable(database_name, table_name);
/// TODO: Find a way to support projections for StorageMerge
InterpreterSelectQuery interpreter{modified_query_info.query,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections()};
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
}
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
@ -618,75 +699,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
*/
builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
}
else
{
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, storage_snapshot, modified_query_info);
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
QueryPlan plan;
if (StorageView * view = dynamic_cast<StorageView *>(storage.get()))
{
/// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read.
/// The most intuitive way is to use InterpreterSelectQuery.
/// Intercept the settings
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
modified_context->setSetting("max_block_size", max_block_size);
InterpreterSelectQuery(
modified_query_info.query, modified_context, storage, view->getInMemoryMetadataPtr(), SelectQueryOptions(processed_stage))
.buildQueryPlan(plan);
}
else
{
storage->read(
plan,
real_column_names,
storage_snapshot,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
UInt32(streams_num));
}
if (!plan.isInitialized())
return {};
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
read_from_merge_tree->addFilterNodes(added_filter_nodes);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(modified_context),
BuildQueryPipelineSettings::fromContext(modified_context));
}
else if (processed_stage > storage_stage)
{
modified_select.replaceDatabaseAndTable(database_name, table_name);
/// Maximum permissible parallelism is streams_num
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
/// TODO: Find a way to support projections for StorageMerge
InterpreterSelectQuery interpreter{
modified_query_info.query, modified_context, SelectQueryOptions(processed_stage).ignoreProjections()};
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
}
}
if (builder->initialized())
{
@ -739,7 +751,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder);
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage);
}
return builder;
@ -914,7 +926,8 @@ void ReadFromMerge::convertingSourceStream(
const StorageMetadataPtr & metadata_snapshot,
const Aliases & aliases,
ContextPtr local_context,
QueryPipelineBuilder & builder)
QueryPipelineBuilder & builder,
const QueryProcessingStage::Enum & processed_stage)
{
Block before_block_header = builder.getHeader();
@ -940,7 +953,7 @@ void ReadFromMerge::convertingSourceStream(
ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name;
if (local_context->getSettingsRef().allow_experimental_analyzer)
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),

View File

@ -197,15 +197,18 @@ private:
const Block & header,
const Aliases & aliases,
const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Names real_column_names,
ContextMutablePtr modified_context,
size_t streams_num,
bool concat_streams = false);
static void convertingSourceStream(
const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases,
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
const Aliases & aliases,
ContextPtr context,
QueryPipelineBuilder & builder);
QueryPipelineBuilder & builder,
const QueryProcessingStage::Enum & processed_stage);
};
}