From 217659c1c61a9e15dde1590e6d40bf8cec166c4f Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Mon, 6 Sep 2021 06:11:16 +0300 Subject: [PATCH 1/2] Add test. --- .../__init__.py | 0 .../configs/test_cluster.xml | 18 +++ .../test.py | 116 ++++++++++++++++++ 3 files changed, 134 insertions(+) create mode 100644 tests/integration/test_sharding_key_from_default_column/__init__.py create mode 100644 tests/integration/test_sharding_key_from_default_column/configs/test_cluster.xml create mode 100644 tests/integration/test_sharding_key_from_default_column/test.py diff --git a/tests/integration/test_sharding_key_from_default_column/__init__.py b/tests/integration/test_sharding_key_from_default_column/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_sharding_key_from_default_column/configs/test_cluster.xml b/tests/integration/test_sharding_key_from_default_column/configs/test_cluster.xml new file mode 100644 index 00000000000..0437e047fad --- /dev/null +++ b/tests/integration/test_sharding_key_from_default_column/configs/test_cluster.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_sharding_key_from_default_column/test.py b/tests/integration/test_sharding_key_from_default_column/test.py new file mode 100644 index 00000000000..1717a1ee14a --- /dev/null +++ b/tests/integration/test_sharding_key_from_default_column/test.py @@ -0,0 +1,116 @@ +import pytest +import itertools +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', main_configs=['configs/test_cluster.xml'], with_zookeeper=True) +node2 = cluster.add_instance('node2', main_configs=['configs/test_cluster.xml'], with_zookeeper=True) + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(autouse=True) +def cleanup_after_test(): + try: + yield + finally: + node1.query("DROP TABLE IF EXISTS dist ON CLUSTER 'test_cluster'") + node1.query("DROP TABLE IF EXISTS local ON CLUSTER 'test_cluster'") + + +# A default column is used in the sharding key expression. +def test_default_column(): + node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 DEFAULT x + 100, z Int32 DEFAULT x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)") + node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 DEFAULT x + 200, z Int32 DEFAULT x - y) ENGINE = MergeTree() ORDER BY y") + + for insert_sync in [0, 1]: + settings = {'insert_distributed_sync': insert_sync} + + # INSERT INTO TABLE dist (x) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings) + node1.query("SYSTEM FLUSH DISTRIBUTED dist") + assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 102, 104], [4, 104, 108]]) + assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 101, 102], [3, 103, 106]]) + assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 102, 104], [4, 104, 108], [1, 101, 102], [3, 103, 106]]) + + # INSERT INTO TABLE dist (x, y) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + node1.query("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings) + node1.query("SYSTEM FLUSH DISTRIBUTED dist") + assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 22, 24]]) + assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 11, 12], [3, 33, 36]]) + assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 22, 24], [1, 11, 12], [3, 33, 36]]) + + +# A materialized column is used in the sharding key expression and `insert_allow_materialized_columns` set to 1. +def test_materialized_column_allow_insert_materialized(): + node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 100, z Int32 MATERIALIZED x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)") + node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 200, z Int32 MATERIALIZED x - y) ENGINE = MergeTree() ORDER BY y") + + for insert_sync in [0, 1]: + settings = {'insert_distributed_sync': insert_sync, 'insert_allow_materialized_columns': 1} + + # INSERT INTO TABLE dist (x) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings) + node1.query("SYSTEM FLUSH DISTRIBUTED dist") + assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 102, 104], [4, 104, 108]]) + assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 101, 102], [3, 103, 106]]) + assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 102, 104], [4, 104, 108], [1, 101, 102], [3, 103, 106]]) + + # INSERT INTO TABLE dist (x, y) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + node1.query("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings) + node1.query("SYSTEM FLUSH DISTRIBUTED dist") + assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 22, 24]]) + assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 11, 12], [3, 33, 36]]) + assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 22, 24], [1, 11, 12], [3, 33, 36]]) + + +# A materialized column is used in the sharding key expression and `insert_allow_materialized_columns` set to 0. +def test_materialized_column_disallow_insert_materialized(): + node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 100, z Int32 MATERIALIZED x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)") + node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 200, z Int32 MATERIALIZED x - y) ENGINE = MergeTree() ORDER BY y") + + for insert_sync in [0, 1]: + settings = {'insert_distributed_sync': insert_sync, 'insert_allow_materialized_columns': 0} + + # INSERT INTO TABLE dist (x) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings) + node1.query("SYSTEM FLUSH DISTRIBUTED dist") + assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 202, -200], [4, 204, -200]]) + assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 201, -200], [3, 203, -200]]) + assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 202, -200], [4, 204, -200], [1, 201, -200], [3, 203, -200]]) + + # INSERT INTO TABLE dist (x, y) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + expected_error = "Cannot insert column y, because it is MATERIALIZED column" + assert expected_error in node1.query_and_get_error("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings) + + +# Almost the same as the previous test `test_materialized_column_disallow_insert_materialized`, but the sharding key has different values. +def test_materialized_column_disallow_insert_materialized_different_shards(): + node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 101, z Int32 MATERIALIZED x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)") + node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 200, z Int32 MATERIALIZED x - y) ENGINE = MergeTree() ORDER BY y") + + for insert_sync in [0, 1]: + settings = {'insert_distributed_sync': insert_sync, 'insert_allow_materialized_columns': 0} + + # INSERT INTO TABLE dist (x) + node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'") + node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings) + node1.query("SYSTEM FLUSH DISTRIBUTED dist") + assert node1.query("SELECT x, y, z FROM local") == TSV([[1, 201, -200], [3, 203, -200]]) + assert node2.query("SELECT x, y, z FROM local") == TSV([[2, 202, -200], [4, 204, -200]]) + assert node1.query("SELECT x, y, z FROM dist") == TSV([[1, 201, -200], [3, 203, -200], [2, 202, -200], [4, 204, -200]]) From 1636ee24bb419499b354123183229d1891ca5670 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Sun, 3 Oct 2021 23:06:31 +0300 Subject: [PATCH 2/2] Fix using materialized column as sharding key. --- src/Storages/Distributed/DistributedSink.cpp | 67 ++++++++++++-------- src/Storages/Distributed/DistributedSink.h | 10 ++- src/Storages/StorageDistributed.cpp | 30 ++------- 3 files changed, 56 insertions(+), 51 deletions(-) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 6596598476d..4d95911bb64 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -3,6 +3,8 @@ #include #include +#include +#include #include #include @@ -87,26 +89,40 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block & } +static ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Names & column_names) +{ + auto query = std::make_shared(); + query->table_id = StorageID(database, table); + auto columns = std::make_shared(); + query->columns = columns; + query->children.push_back(columns); + for (const auto & column_name : column_names) + columns->children.push_back(std::make_shared(column_name)); + return query; +} + + DistributedSink::DistributedSink( ContextPtr context_, StorageDistributed & storage_, const StorageMetadataPtr & metadata_snapshot_, - const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_, - StorageID main_table_) + StorageID main_table_, + const Names & columns_to_send_) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , context(Context::createCopy(context_)) , storage(storage_) , metadata_snapshot(metadata_snapshot_) - , query_ast(query_ast_) - , query_string(queryToString(query_ast_)) + , query_ast(createInsertToRemoteTableQuery(main_table_.database_name, main_table_.table_name, columns_to_send_)) + , query_string(queryToString(query_ast)) , cluster(cluster_) , insert_sync(insert_sync_) , allow_materialized(context->getSettingsRef().insert_allow_materialized_columns) , insert_timeout(insert_timeout_) , main_table(main_table_) + , columns_to_send(columns_to_send_.begin(), columns_to_send_.end()) , log(&Poco::Logger::get("DistributedBlockOutputStream")) { const auto & settings = context->getSettingsRef(); @@ -127,27 +143,25 @@ void DistributedSink::consume(Chunk chunk) auto ordinary_block = getHeader().cloneWithColumns(chunk.detachColumns()); - if (!allow_materialized) - { - /* They are added by the AddingDefaultBlockOutputStream, and we will get - * different number of columns eventually */ - for (const auto & col : metadata_snapshot->getColumns().getMaterialized()) - { - if (ordinary_block.has(col.name)) - { - ordinary_block.erase(col.name); - LOG_DEBUG(log, "{}: column {} will be removed, because it is MATERIALIZED", - storage.getStorageID().getNameForLogs(), col.name); - } - } - } - if (insert_sync) writeSync(ordinary_block); else writeAsync(ordinary_block); } + +Block DistributedSink::removeSuperfluousColumns(Block block) const +{ + for (size_t i = block.columns(); i;) + { + --i; + if (!columns_to_send.contains(block.getByPosition(i).name)) + block.erase(i); + } + return block; +} + + void DistributedSink::writeAsync(const Block & block) { if (random_shard_insert) @@ -402,6 +416,8 @@ void DistributedSink::writeSync(const Block & block) { const Settings & settings = context->getSettingsRef(); const auto & shards_info = cluster->getShardsInfo(); + Block block_to_send = removeSuperfluousColumns(block); + size_t start = 0; size_t end = shards_info.size(); @@ -414,7 +430,7 @@ void DistributedSink::writeSync(const Block & block) if (!pool) { /// Deferred initialization. Only for sync insertion. - initWritingJobs(block, start, end); + initWritingJobs(block_to_send, start, end); size_t jobs_count = random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count); size_t max_threads = std::min(settings.max_distributed_connections, jobs_count); @@ -459,7 +475,7 @@ void DistributedSink::writeSync(const Block & block) finished_jobs_count = 0; for (size_t shard_index : collections::range(start, end)) for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) - pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards)); + pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards)); } catch (...) { @@ -584,12 +600,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) { const auto & shard_info = cluster->getShardsInfo()[shard_id]; const auto & settings = context->getSettingsRef(); + Block block_to_send = removeSuperfluousColumns(block); if (shard_info.hasInternalReplication()) { if (shard_info.isLocal() && settings.prefer_localhost_replica) /// Prefer insert into current instance directly - writeToLocal(block, shard_info.getLocalNodeCount()); + writeToLocal(block_to_send, shard_info.getLocalNodeCount()); else { const auto & path = shard_info.insertPathForInternalReplication( @@ -597,13 +614,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) settings.use_compact_format_in_distributed_parts_names); if (path.empty()) throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR); - writeToShard(block, {path}); + writeToShard(block_to_send, {path}); } } else { if (shard_info.isLocal() && settings.prefer_localhost_replica) - writeToLocal(block, shard_info.getLocalNodeCount()); + writeToLocal(block_to_send, shard_info.getLocalNodeCount()); std::vector dir_names; for (const auto & address : cluster->getShardsAddresses()[shard_id]) @@ -611,7 +628,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names)); if (!dir_names.empty()) - writeToShard(block, dir_names); + writeToShard(block_to_send, dir_names); } } diff --git a/src/Storages/Distributed/DistributedSink.h b/src/Storages/Distributed/DistributedSink.h index 1fdf5c0291f..3c0b6333fc1 100644 --- a/src/Storages/Distributed/DistributedSink.h +++ b/src/Storages/Distributed/DistributedSink.h @@ -43,11 +43,11 @@ public: ContextPtr context_, StorageDistributed & storage_, const StorageMetadataPtr & metadata_snapshot_, - const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_, - StorageID main_table_); + StorageID main_table_, + const Names & columns_to_send_); String getName() const override { return "DistributedSink"; } void consume(Chunk chunk) override; @@ -65,6 +65,9 @@ private: void writeAsyncImpl(const Block & block, size_t shard_id = 0); + /// Removes columns which should not be sent to shards. + Block removeSuperfluousColumns(Block block) const; + /// Increments finished_writings_count after each repeat. void writeToLocal(const Block & block, size_t repeats); @@ -84,7 +87,9 @@ private: /// Returns the number of blocks was written for each cluster node. Uses during exception handling. std::string getCurrentStateDescription(); + /// Context used for writing to remote tables. ContextMutablePtr context; + StorageDistributed & storage; StorageMetadataPtr metadata_snapshot; ASTPtr query_ast; @@ -102,6 +107,7 @@ private: /// Sync-related stuff UInt64 insert_timeout; // in seconds StorageID main_table; + NameSet columns_to_send; Stopwatch watch; Stopwatch watch_current_block; std::optional pool; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index dd304065a14..b9c15e19c33 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -159,23 +159,6 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co return modified_query_ast; } -/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed -/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from -/// the sample block instead. -ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block) -{ - auto query = std::make_shared(); - query->table_id = StorageID(database, table); - - auto columns = std::make_shared(); - query->columns = columns; - query->children.push_back(columns); - for (const auto & col : sample_block) - columns->children.push_back(std::make_shared(col.name)); - - return query; -} - /// Calculate maximum number in file names in directory and all subdirectories. /// To ensure global order of data blocks yet to be sent across server restarts. UInt64 getMaximumFileNumber(const std::string & dir_path) @@ -682,17 +665,16 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster; auto timeout = settings.insert_distributed_timeout; - Block sample_block; - if (!settings.insert_allow_materialized_columns) - sample_block = metadata_snapshot->getSampleBlockNonMaterialized(); + Names columns_to_send; + if (settings.insert_allow_materialized_columns) + columns_to_send = metadata_snapshot->getSampleBlock().getNames(); else - sample_block = metadata_snapshot->getSampleBlock(); + columns_to_send = metadata_snapshot->getSampleBlockNonMaterialized().getNames(); /// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster return std::make_shared( - local_context, *this, metadata_snapshot, - createInsertToRemoteTableQuery(remote_database, remote_table, sample_block), - cluster, insert_sync, timeout, StorageID{remote_database, remote_table}); + local_context, *this, metadata_snapshot, cluster, insert_sync, timeout, + StorageID{remote_database, remote_table}, columns_to_send); }