Merge pull request #25694 from kssenii/storage-merge-aliases

Aliases for StorageMerge fix
This commit is contained in:
Anton Popov 2021-06-28 16:03:48 +03:00 committed by GitHub
commit 970af529a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 159 additions and 14 deletions

View File

@ -11,6 +11,8 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
@ -205,7 +207,7 @@ Pipe StorageMerge::read(
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
return createSources(
{}, query_info, processed_stage, max_block_size, header, {}, real_column_names, modified_context, 0, has_table_virtual_column);
{}, query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
@ -233,6 +235,9 @@ Pipe StorageMerge::read(
query_info.input_order_info = input_sorting_info;
}
auto sample_block = getInMemoryMetadataPtr()->getSampleBlock();
Names required_columns;
for (const auto & table : selected_tables)
{
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
@ -246,12 +251,51 @@ Pipe StorageMerge::read(
if (query_info.query->as<ASTSelectQuery>()->sampleSize() && !storage->supportsSampling())
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
Aliases aliases;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_columns = storage_metadata_snapshot->getColumns();
if (processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty())
{
auto syntax_result = TreeRewriter(local_context).analyzeSelect(query_info.query, TreeRewriterResult({}, storage, storage_metadata_snapshot));
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
for (const auto & column : real_column_names)
{
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
if (is_alias)
{
column_expr = column_default->expression->clone();
replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(),
syntax_result->array_join_result_to_source, local_context);
auto column_description = storage_columns.get(column);
column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(),
storage_metadata_snapshot->getColumns().getAll(), local_context);
column_expr = setAlias(column_expr, column);
auto type = sample_block.getByName(column).type;
aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() });
}
else
column_expr = std::make_shared<ASTIdentifier>(column);
required_columns_expr_list->children.emplace_back(std::move(column_expr));
}
syntax_result = TreeRewriter(local_context).analyze(required_columns_expr_list, storage_columns.getAllPhysical(),
storage, storage_metadata_snapshot);
auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, local_context).getActionsDAG(true);
required_columns = alias_actions->getRequiredColumns().getNames();
}
auto source_pipe = createSources(
storage_metadata_snapshot, query_info, processed_stage,
max_block_size, header, table, real_column_names, modified_context,
current_streams, has_table_virtual_column);
max_block_size, header, aliases, table, required_columns.empty() ? real_column_names : required_columns,
modified_context, current_streams, has_table_virtual_column);
pipes.emplace_back(std::move(source_pipe));
}
@ -272,6 +316,7 @@ Pipe StorageMerge::createSources(
const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size,
const Block & header,
const Aliases & aliases,
const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
ContextMutablePtr modified_context,
@ -369,7 +414,7 @@ Pipe StorageMerge::createSources(
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, metadata_snapshot, modified_context, modified_query_info.query, pipe, processed_stage);
convertingSourceStream(header, metadata_snapshot, aliases, modified_context, modified_query_info.query, pipe, processed_stage);
pipe.addTableLock(struct_lock);
pipe.addStorageHolder(storage);
@ -492,6 +537,7 @@ void StorageMerge::alter(
void StorageMerge::convertingSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
const Aliases & aliases,
ContextPtr local_context,
ASTPtr & query,
Pipe & pipe,
@ -499,16 +545,37 @@ void StorageMerge::convertingSourceStream(
{
Block before_block_header = pipe.getHeader();
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
auto storage_sample_block = metadata_snapshot->getSampleBlock();
auto pipe_columns = pipe.getHeader().getNamesAndTypesList();
pipe.addSimpleTransform([&](const Block & stream_header)
for (const auto & alias : aliases)
{
return std::make_shared<ExpressionTransform>(stream_header, convert_actions);
});
pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type));
ASTPtr expr = std::move(alias.expression);
auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns);
auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context};
auto dag = std::make_shared<ActionsDAG>(pipe_columns);
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(pipe.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(convert_actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
auto where_expression = query->as<ASTSelectQuery>()->where();

View File

@ -84,12 +84,22 @@ protected:
const String & source_table_regexp_,
ContextPtr context_);
struct AliasData
{
String name;
DataTypePtr type;
ASTPtr expression;
};
using Aliases = std::vector<AliasData>;
Pipe createSources(
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
UInt64 max_block_size,
const Block & header,
const Aliases & aliases,
const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
ContextMutablePtr modified_context,
@ -98,7 +108,7 @@ protected:
bool concat_streams = false);
void convertingSourceStream(
const Block & header, const StorageMetadataPtr & metadata_snapshot,
const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases,
ContextPtr context, ASTPtr & query,
Pipe & pipe, QueryProcessingStage::Enum processed_stage);
};

View File

@ -0,0 +1,10 @@
alias1
1 4 16 23
23 16 4 1
2020-02-02 1 4 2 16 3 23
alias2
1 3 4 4
4 4 3 1
23 16 4 1
2020-02-01 1 3 2 4 3 4
2020-02-02 1 4 2 16 3 23

View File

@ -0,0 +1,57 @@
drop table if exists merge;
create table merge
(
dt Date,
colAlias0 Int32,
colAlias1 Int32,
col2 Int32,
colAlias2 UInt32,
col3 Int32,
colAlias3 UInt32
)
engine = Merge(currentDatabase(), '^alias_');
drop table if exists alias_1;
drop table if exists alias_2;
create table alias_1
(
dt Date,
col Int32,
colAlias0 UInt32 alias col,
colAlias1 UInt32 alias col3 + colAlias0,
col2 Int32,
colAlias2 Int32 alias colAlias1 + col2 + 10,
col3 Int32,
colAlias3 Int32 alias colAlias2 + colAlias1 + col3
)
engine = MergeTree()
order by (dt);
insert into alias_1 (dt, col, col2, col3) values ('2020-02-02', 1, 2, 3);
select 'alias1';
select colAlias0, colAlias1, colAlias2, colAlias3 from alias_1;
select colAlias3, colAlias2, colAlias1, colAlias0 from merge;
select * from merge;
create table alias_2
(
dt Date,
col Int32,
col2 Int32,
colAlias0 UInt32 alias col,
colAlias3 Int32 alias col3 + colAlias0,
colAlias1 UInt32 alias colAlias0 + col2,
colAlias2 Int32 alias colAlias0 + colAlias1,
col3 Int32
)
engine = MergeTree()
order by (dt);
insert into alias_2 (dt, col, col2, col3) values ('2020-02-01', 1, 2, 3);
select 'alias2';
select colAlias0, colAlias1, colAlias2, colAlias3 from alias_2;
select colAlias3, colAlias2, colAlias1, colAlias0 from merge order by dt;
select * from merge order by dt;

View File

@ -852,6 +852,7 @@
"01921_concurrent_ttl_and_normal_merges_zookeeper_long", // heavy test, better to run sequentially
"01913_replace_dictionary",
"01914_exchange_dictionaries",
"01915_create_or_replace_dictionary"
"01915_create_or_replace_dictionary",
"01925_test_storage_merge_aliases"
]
}