mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Backport #68672 to 24.8: Return back virtual columns to distributed tables
This commit is contained in:
parent
d9be15efe5
commit
d58f833cca
@ -290,6 +290,10 @@ VirtualColumnsDescription StorageDistributed::createVirtuals()
|
|||||||
|
|
||||||
desc.addEphemeral("_shard_num", std::make_shared<DataTypeUInt32>(), "Deprecated. Use function shardNum instead");
|
desc.addEphemeral("_shard_num", std::make_shared<DataTypeUInt32>(), "Deprecated. Use function shardNum instead");
|
||||||
|
|
||||||
|
/// Add virtual columns from table with Merge engine.
|
||||||
|
desc.addEphemeral("_database", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "The name of database which the row comes from");
|
||||||
|
desc.addEphemeral("_table", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "The name of table which the row comes from");
|
||||||
|
|
||||||
return desc;
|
return desc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,10 +642,6 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
|
|||||||
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
|
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
auto child = createPlanForTable(
|
auto child = createPlanForTable(
|
||||||
nested_storage_snaphsot,
|
nested_storage_snaphsot,
|
||||||
@ -657,6 +653,7 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
|
|||||||
row_policy_data_opt,
|
row_policy_data_opt,
|
||||||
modified_context,
|
modified_context,
|
||||||
current_streams);
|
current_streams);
|
||||||
|
|
||||||
child.plan.addInterpreterContext(modified_context);
|
child.plan.addInterpreterContext(modified_context);
|
||||||
|
|
||||||
if (child.plan.isInitialized())
|
if (child.plan.isInitialized())
|
||||||
@ -914,12 +911,14 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
|
|||||||
modified_query_info.table_expression = replacement_table_expression;
|
modified_query_info.table_expression = replacement_table_expression;
|
||||||
modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression);
|
modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression);
|
||||||
|
|
||||||
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
|
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All)
|
||||||
if (storage_snapshot_->storage.supportsSubcolumns())
|
.withExtendedObjects()
|
||||||
get_column_options.withSubcolumns();
|
.withSubcolumns(storage_snapshot_->storage.supportsSubcolumns());
|
||||||
|
|
||||||
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
|
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
|
||||||
|
|
||||||
|
/// Consider only non-virtual columns of storage while checking for _table and _database columns.
|
||||||
|
/// I.e. always override virtual columns with these names from underlying table (if any).
|
||||||
if (!storage_snapshot_->tryGetColumn(get_column_options, "_table"))
|
if (!storage_snapshot_->tryGetColumn(get_column_options, "_table"))
|
||||||
{
|
{
|
||||||
auto table_name_node = std::make_shared<ConstantNode>(current_storage_id.table_name);
|
auto table_name_node = std::make_shared<ConstantNode>(current_storage_id.table_name);
|
||||||
@ -946,6 +945,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
|
|||||||
column_name_to_node.emplace("_database", function_node);
|
column_name_to_node.emplace("_database", function_node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get_column_options.withVirtuals();
|
||||||
auto storage_columns = storage_snapshot_->metadata->getColumns();
|
auto storage_columns = storage_snapshot_->metadata->getColumns();
|
||||||
|
|
||||||
bool with_aliases = /* common_processed_stage == QueryProcessingStage::FetchColumns && */ !storage_columns.getAliases().empty();
|
bool with_aliases = /* common_processed_stage == QueryProcessingStage::FetchColumns && */ !storage_columns.getAliases().empty();
|
||||||
|
@ -54,6 +54,8 @@ _row_exists UInt8 Persisted mask created by lightweight delete that show wheth
|
|||||||
_block_number UInt64 Persisted original number of block that was assigned at insert Delta, LZ4 1
|
_block_number UInt64 Persisted original number of block that was assigned at insert Delta, LZ4 1
|
||||||
_block_offset UInt64 Persisted original number of row in block that was assigned at insert Delta, LZ4 1
|
_block_offset UInt64 Persisted original number of row in block that was assigned at insert Delta, LZ4 1
|
||||||
_shard_num UInt32 Deprecated. Use function shardNum instead 1
|
_shard_num UInt32 Deprecated. Use function shardNum instead 1
|
||||||
|
_database LowCardinality(String) The name of database which the row comes from 1
|
||||||
|
_table LowCardinality(String) The name of table which the row comes from 1
|
||||||
SET describe_compact_output = 0, describe_include_virtual_columns = 1, describe_include_subcolumns = 1;
|
SET describe_compact_output = 0, describe_include_virtual_columns = 1, describe_include_subcolumns = 1;
|
||||||
DESCRIBE TABLE t_describe_options;
|
DESCRIBE TABLE t_describe_options;
|
||||||
id UInt64 index column 0 0
|
id UInt64 index column 0 0
|
||||||
@ -87,6 +89,8 @@ _row_exists UInt8 Persisted mask created by lightweight delete that show wheth
|
|||||||
_block_number UInt64 Persisted original number of block that was assigned at insert Delta, LZ4 0 1
|
_block_number UInt64 Persisted original number of block that was assigned at insert Delta, LZ4 0 1
|
||||||
_block_offset UInt64 Persisted original number of row in block that was assigned at insert Delta, LZ4 0 1
|
_block_offset UInt64 Persisted original number of row in block that was assigned at insert Delta, LZ4 0 1
|
||||||
_shard_num UInt32 Deprecated. Use function shardNum instead 0 1
|
_shard_num UInt32 Deprecated. Use function shardNum instead 0 1
|
||||||
|
_database LowCardinality(String) The name of database which the row comes from 0 1
|
||||||
|
_table LowCardinality(String) The name of table which the row comes from 0 1
|
||||||
arr.size0 UInt64 1 0
|
arr.size0 UInt64 1 0
|
||||||
t.a String ZSTD(1) 1 0
|
t.a String ZSTD(1) 1 0
|
||||||
t.b UInt64 ZSTD(1) 1 0
|
t.b UInt64 ZSTD(1) 1 0
|
||||||
@ -144,6 +148,8 @@ _row_exists UInt8 1
|
|||||||
_block_number UInt64 1
|
_block_number UInt64 1
|
||||||
_block_offset UInt64 1
|
_block_offset UInt64 1
|
||||||
_shard_num UInt32 1
|
_shard_num UInt32 1
|
||||||
|
_database LowCardinality(String) 1
|
||||||
|
_table LowCardinality(String) 1
|
||||||
SET describe_compact_output = 1, describe_include_virtual_columns = 1, describe_include_subcolumns = 1;
|
SET describe_compact_output = 1, describe_include_virtual_columns = 1, describe_include_subcolumns = 1;
|
||||||
DESCRIBE TABLE t_describe_options;
|
DESCRIBE TABLE t_describe_options;
|
||||||
id UInt64 0 0
|
id UInt64 0 0
|
||||||
@ -177,6 +183,8 @@ _row_exists UInt8 0 1
|
|||||||
_block_number UInt64 0 1
|
_block_number UInt64 0 1
|
||||||
_block_offset UInt64 0 1
|
_block_offset UInt64 0 1
|
||||||
_shard_num UInt32 0 1
|
_shard_num UInt32 0 1
|
||||||
|
_database LowCardinality(String) 0 1
|
||||||
|
_table LowCardinality(String) 0 1
|
||||||
arr.size0 UInt64 1 0
|
arr.size0 UInt64 1 0
|
||||||
t.a String 1 0
|
t.a String 1 0
|
||||||
t.b UInt64 1 0
|
t.b UInt64 1 0
|
||||||
|
@ -0,0 +1,8 @@
|
|||||||
|
1 t_local_1
|
||||||
|
2 t_local_2
|
||||||
|
1 t_local_1
|
||||||
|
2 t_local_2
|
||||||
|
1 1
|
||||||
|
2 1
|
||||||
|
1 1
|
||||||
|
2 1
|
@ -0,0 +1,24 @@
|
|||||||
|
DROP TABLE IF EXISTS t_local_1;
|
||||||
|
DROP TABLE IF EXISTS t_local_2;
|
||||||
|
DROP TABLE IF EXISTS t_merge;
|
||||||
|
DROP TABLE IF EXISTS t_distr;
|
||||||
|
|
||||||
|
CREATE TABLE t_local_1 (a UInt32) ENGINE = MergeTree ORDER BY a;
|
||||||
|
CREATE TABLE t_local_2 (a UInt32) ENGINE = MergeTree ORDER BY a;
|
||||||
|
|
||||||
|
INSERT INTO t_local_1 VALUES (1);
|
||||||
|
INSERT INTO t_local_2 VALUES (2);
|
||||||
|
|
||||||
|
CREATE TABLE t_merge AS t_local_1 ENGINE = Merge(currentDatabase(), '^(t_local_1|t_local_2)$');
|
||||||
|
CREATE TABLE t_distr AS t_local_1 engine=Distributed('test_shard_localhost', currentDatabase(), t_merge, rand());
|
||||||
|
|
||||||
|
SELECT a, _table FROM t_merge ORDER BY a;
|
||||||
|
SELECT a, _table FROM t_distr ORDER BY a;
|
||||||
|
|
||||||
|
SELECT a, _database = currentDatabase() FROM t_merge ORDER BY a;
|
||||||
|
SELECT a, _database = currentDatabase() FROM t_distr ORDER BY a;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS t_local_1;
|
||||||
|
DROP TABLE IF EXISTS t_local_2;
|
||||||
|
DROP TABLE IF EXISTS t_merge;
|
||||||
|
DROP TABLE IF EXISTS t_distr;
|
Loading…
Reference in New Issue
Block a user