Merge pull request #31044 from kssenii/fix-storage-merge-with-aliases-and-where

Fix StorageMerge with aliases and where
This commit is contained in:
Anton Popov 2021-11-11 21:46:02 +03:00 committed by GitHub
commit 4c6c80b9a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 154 additions and 41 deletions

View File

@ -180,6 +180,28 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
}
SelectQueryInfo StorageMerge::getModifiedQueryInfo(
const SelectQueryInfo & query_info, ContextPtr modified_context, const StorageID & current_storage_id, bool is_merge_engine)
{
SelectQueryInfo modified_query_info = query_info;
modified_query_info.query = query_info.query->clone();
/// Original query could contain JOIN but we need only the first joined table and its columns.
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
TreeRewriterResult new_analyzer_res = *modified_query_info.syntax_analyzer_result;
removeJoin(modified_select, new_analyzer_res, modified_context);
modified_query_info.syntax_analyzer_result = std::make_shared<TreeRewriterResult>(std::move(new_analyzer_res));
if (!is_merge_engine)
{
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", current_storage_id.table_name);
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_database", current_storage_id.database_name);
}
return modified_query_info;
}
Pipe StorageMerge::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -222,10 +244,12 @@ Pipe StorageMerge::read(
= getSelectedTables(local_context, query_info.query, has_database_virtual_column, has_table_virtual_column);
if (selected_tables.empty())
{
auto modified_query_info = getModifiedQueryInfo(query_info, modified_context, getStorageID(), false);
/// FIXME: do we support sampling in this case?
return createSources(
{},
query_info,
modified_query_info,
processed_stage,
max_block_size,
header,
@ -236,6 +260,7 @@ Pipe StorageMerge::read(
0,
has_database_virtual_column,
has_table_virtual_column);
}
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
@ -264,7 +289,6 @@ Pipe StorageMerge::read(
}
auto sample_block = getInMemoryMetadataPtr()->getSampleBlock();
Names required_columns;
for (const auto & table : selected_tables)
{
@ -283,12 +307,16 @@ Pipe StorageMerge::read(
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_columns = storage_metadata_snapshot->getColumns();
if (processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty())
{
auto syntax_result = TreeRewriter(local_context).analyzeSelect(query_info.query, TreeRewriterResult({}, storage, storage_metadata_snapshot));
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
auto modified_query_info = getModifiedQueryInfo(query_info, modified_context, storage->getStorageID(), storage->as<StorageMerge>());
auto syntax_result = TreeRewriter(local_context).analyzeSelect(modified_query_info.query, TreeRewriterResult({}, storage, storage_metadata_snapshot));
Names column_names_as_aliases;
bool with_aliases = processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
if (with_aliases)
{
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
for (const auto & column : real_column_names)
{
const auto column_default = storage_columns.getDefault(column);
@ -314,21 +342,24 @@ Pipe StorageMerge::read(
required_columns_expr_list->children.emplace_back(std::move(column_expr));
}
syntax_result = TreeRewriter(local_context).analyze(required_columns_expr_list, storage_columns.getAllPhysical(),
storage, storage_metadata_snapshot);
syntax_result = TreeRewriter(local_context).analyze(
required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage_metadata_snapshot);
auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, local_context).getActionsDAG(true);
required_columns = alias_actions->getRequiredColumns().getNames();
column_names_as_aliases = alias_actions->getRequiredColumns().getNames();
if (column_names_as_aliases.empty())
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()));
}
auto source_pipe = createSources(
storage_metadata_snapshot,
query_info,
modified_query_info,
processed_stage,
max_block_size,
header,
aliases,
table,
required_columns.empty() ? real_column_names : required_columns,
column_names_as_aliases.empty() ? real_column_names : column_names_as_aliases,
modified_context,
current_streams,
has_database_virtual_column,
@ -350,7 +381,7 @@ Pipe StorageMerge::read(
Pipe StorageMerge::createSources(
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
SelectQueryInfo & modified_query_info,
const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size,
const Block & header,
@ -364,19 +395,8 @@ Pipe StorageMerge::createSources(
bool concat_streams)
{
const auto & [database_name, storage, struct_lock, table_name] = storage_with_lock;
SelectQueryInfo modified_query_info = query_info;
modified_query_info.query = query_info.query->clone();
/// Original query could contain JOIN but we need only the first joined table and its columns.
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
TreeRewriterResult new_analyzer_res = *query_info.syntax_analyzer_result;
removeJoin(modified_select, new_analyzer_res, modified_context);
modified_query_info.syntax_analyzer_result = std::make_shared<TreeRewriterResult>(std::move(new_analyzer_res));
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", table_name);
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_database", database_name);
Pipe pipe;
if (!storage)
@ -710,27 +730,30 @@ void StorageMerge::convertingSourceStream(
if (!where_expression)
return;
for (size_t column_index : collections::range(0, header.columns()))
if (processed_stage > QueryProcessingStage::FetchColumns)
{
ColumnWithTypeAndName header_column = header.getByPosition(column_index);
ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name);
/// If the processed_stage greater than FetchColumns and the block structure between streams is different.
/// the where expression maybe invalid because of convertingBlockInputStream.
/// So we need to throw exception.
if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns)
for (size_t column_index : collections::range(0, header.columns()))
{
NamesAndTypesList source_columns = metadata_snapshot->getSampleBlock().getNamesAndTypesList();
auto virtual_column = *getVirtuals().tryGetByName("_table");
source_columns.emplace_back(NameAndTypePair{virtual_column.name, virtual_column.type});
auto syntax_result = TreeRewriter(local_context).analyze(where_expression, source_columns);
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, local_context}.getActions(false, false);
Names required_columns = actions->getRequiredColumns();
for (const auto & required_column : required_columns)
ColumnWithTypeAndName header_column = header.getByPosition(column_index);
ColumnWithTypeAndName before_column = before_block_header.getByName(header_column.name);
/// If the processed_stage greater than FetchColumns and the block structure between streams is different.
/// the where expression maybe invalid because of convertingBlockInputStream.
/// So we need to throw exception.
if (!header_column.type->equals(*before_column.type.get()))
{
if (required_column == header_column.name)
throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure()
+ "\n" + header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
NamesAndTypesList source_columns = metadata_snapshot->getSampleBlock().getNamesAndTypesList();
auto virtual_column = *getVirtuals().tryGetByName("_table");
source_columns.emplace_back(NameAndTypePair{virtual_column.name, virtual_column.type});
auto syntax_result = TreeRewriter(local_context).analyze(where_expression, source_columns);
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, local_context}.getActions(false, false);
Names required_columns = actions->getRequiredColumns();
for (const auto & required_column : required_columns)
{
if (required_column == header_column.name)
throw Exception("Block structure mismatch in Merge Storage: different types:\n" + before_block_header.dumpStructure()
+ "\n" + header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
}
}
}
}

View File

@ -129,6 +129,9 @@ protected:
const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases,
ContextPtr context, ASTPtr & query,
Pipe & pipe, QueryProcessingStage::Enum processed_stage);
static SelectQueryInfo getModifiedQueryInfo(
const SelectQueryInfo & query_info, ContextPtr modified_context, const StorageID & current_storage_id, bool is_merge_engine);
};
}

View File

@ -0,0 +1,53 @@
-- { echo }
SELECT * FROM tt_m order by a;
1 1
2 4
3 4
5 12
SELECT * FROM tt_m WHERE b != 0 order by b;
1 1
2 4
3 4
5 12
SELECT * FROM tt_m WHERE b != 1 order by b;
2 4
3 4
5 12
SELECT * FROM tt_m WHERE b != a * 2 order by b;
1 1
3 4
5 12
SELECT * FROM tt_m WHERE b / 2 != a order by b;
1 1
3 4
5 12
SELECT b FROM tt_m WHERE b >= 0 order by b;
1
4
4
12
SELECT b FROM tt_m WHERE b == 12;
12
SELECT b FROM tt_m ORDER BY b;
1
4
4
12
SELECT b, count() FROM tt_m GROUP BY b order by b;
1 1
4 2
12 1
SELECT b FROM tt_m order by b LIMIT 1 BY b;
1
4
12
SELECT a FROM tt_m WHERE b = 12;
5
SELECT max(a) FROM tt_m group by b order by b;
1
3
5
SELECT a FROM tt_m order by b LIMIT 1 BY b;
1
2
5

View File

@ -0,0 +1,34 @@
DROP TABLE IF EXISTS tt1;
DROP TABLE IF EXISTS tt2;
DROP TABLE IF EXISTS tt3;
DROP TABLE IF EXISTS tt4;
DROP TABLE IF EXISTS tt_m;
CREATE TABLE tt1 (a UInt32, b UInt32 ALIAS a) ENGINE = Memory;
CREATE TABLE tt2 (a UInt32, b UInt32 ALIAS a * 2) ENGINE = Memory;
CREATE TABLE tt3 (a UInt32, b UInt32 ALIAS c, c UInt32) ENGINE = Memory;
CREATE TABLE tt4 (a UInt32, b UInt32 ALIAS 12) ENGINE = Memory;
CREATE TABLE tt_m (a UInt32, b UInt32) ENGINE = Merge(currentDatabase(), 'tt1|tt2|tt3|tt4');
INSERT INTO tt1 VALUES (1);
INSERT INTO tt2 VALUES (2);
INSERT INTO tt3(a, c) VALUES (3, 4);
INSERT INTO tt4 VALUES (5);
-- { echo }
SELECT * FROM tt_m order by a;
SELECT * FROM tt_m WHERE b != 0 order by b;
SELECT * FROM tt_m WHERE b != 1 order by b;
SELECT * FROM tt_m WHERE b != a * 2 order by b;
SELECT * FROM tt_m WHERE b / 2 != a order by b;
SELECT b FROM tt_m WHERE b >= 0 order by b;
SELECT b FROM tt_m WHERE b == 12;
SELECT b FROM tt_m ORDER BY b;
SELECT b, count() FROM tt_m GROUP BY b order by b;
SELECT b FROM tt_m order by b LIMIT 1 BY b;
SELECT a FROM tt_m WHERE b = 12;
SELECT max(a) FROM tt_m group by b order by b;
SELECT a FROM tt_m order by b LIMIT 1 BY b;