Backport #68672 to 24.3: Return back virtual columns to distributed tables

This commit is contained in:
robot-clickhouse 2024-08-28 20:10:25 +00:00
parent 515fd331e7
commit 4e6bfb7b87
5 changed files with 44 additions and 7 deletions

View File

@ -310,6 +310,10 @@ VirtualColumnsDescription StorageDistributed::createVirtuals()
desc.addEphemeral("_shard_num", std::make_shared<DataTypeUInt32>(), "Deprecated. Use function shardNum instead");
/// Add virtual columns from table with Merge engine.
desc.addEphemeral("_database", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "The name of database which the row comes from");
desc.addEphemeral("_table", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "The name of table which the row comes from");
return desc;
}

View File

@ -641,10 +641,6 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
}
}
else
{
}
auto child = createPlanForTable(
nested_storage_snaphsot,
@ -656,6 +652,7 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
row_policy_data_opt,
modified_context,
current_streams);
child.plan.addInterpreterContext(modified_context);
if (child.plan.isInitialized())
@ -911,12 +908,14 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
modified_query_info.table_expression = replacement_table_expression;
modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression);
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
if (storage_snapshot_->storage.supportsSubcolumns())
get_column_options.withSubcolumns();
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All)
.withExtendedObjects()
.withSubcolumns(storage_snapshot_->storage.supportsSubcolumns());
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
/// Consider only non-virtual columns of storage while checking for _table and _database columns.
/// I.e. always override virtual columns with these names from underlying table (if any).
if (!storage_snapshot_->tryGetColumn(get_column_options, "_table"))
{
auto table_name_node = std::make_shared<ConstantNode>(current_storage_id.table_name);
@ -943,6 +942,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
column_name_to_node.emplace("_database", function_node);
}
get_column_options.withVirtuals();
auto storage_columns = storage_snapshot_->metadata->getColumns();
bool with_aliases = /* common_processed_stage == QueryProcessingStage::FetchColumns && */ !storage_columns.getAliases().empty();

View File

@ -33,6 +33,7 @@ DESCRIBE remote(test_shard_localhost, currentDatabase(), t_describe_options) FOR
6. │ t.b │ UInt64 │ │ │ │ ZSTD(1) │ │ 1 │
└───────────┴───────────────────────────┴──────────────┴────────────────────┴──────────────┴──────────────────┴────────────────┴──────────────┘
SET describe_compact_output = 0, describe_include_virtual_columns = 1, describe_include_subcolumns = 0;
DESCRIBE TABLE t_describe_options FORMAT PrettyCompactNoEscapes;
┌─name───────────┬─type──────────────────────┬─default_type─┬─default_expression─┬─comment─────────────────────────────────────────────────────────────────────────────────┬─codec_expression─┬─ttl_expression─┬─is_virtual─┐
1. │ id │ UInt64 │ │ │ index column │ │ │ 0 │

View File

@ -0,0 +1,8 @@
1 t_local_1
2 t_local_2
1 t_local_1
2 t_local_2
1 1
2 1
1 1
2 1

View File

@ -0,0 +1,24 @@
DROP TABLE IF EXISTS t_local_1;
DROP TABLE IF EXISTS t_local_2;
DROP TABLE IF EXISTS t_merge;
DROP TABLE IF EXISTS t_distr;
CREATE TABLE t_local_1 (a UInt32) ENGINE = MergeTree ORDER BY a;
CREATE TABLE t_local_2 (a UInt32) ENGINE = MergeTree ORDER BY a;
INSERT INTO t_local_1 VALUES (1);
INSERT INTO t_local_2 VALUES (2);
CREATE TABLE t_merge AS t_local_1 ENGINE = Merge(currentDatabase(), '^(t_local_1|t_local_2)$');
CREATE TABLE t_distr AS t_local_1 engine=Distributed('test_shard_localhost', currentDatabase(), t_merge, rand());
SELECT a, _table FROM t_merge ORDER BY a;
SELECT a, _table FROM t_distr ORDER BY a;
SELECT a, _database = currentDatabase() FROM t_merge ORDER BY a;
SELECT a, _database = currentDatabase() FROM t_distr ORDER BY a;
DROP TABLE IF EXISTS t_local_1;
DROP TABLE IF EXISTS t_local_2;
DROP TABLE IF EXISTS t_merge;
DROP TABLE IF EXISTS t_distr;