From 4279c7da41dd66597992b1bd33ec885c3d6c4371 Mon Sep 17 00:00:00 2001 From: feng lv Date: Tue, 2 Feb 2021 02:25:19 +0000 Subject: [PATCH] add setting insert_shard_id add test fix style fix --- src/Common/ErrorCodes.cpp | 3 +- src/Core/Settings.h | 1 + .../DistributedBlockOutputStream.cpp | 11 ++++- .../DistributedBlockOutputStream.h | 4 +- src/Storages/StorageDistributed.cpp | 20 +++++++--- .../01684_insert_specify_shard_id.reference | 40 +++++++++++++++++++ .../01684_insert_specify_shard_id.sql | 22 ++++++++++ 7 files changed, 90 insertions(+), 11 deletions(-) create mode 100644 tests/queries/0_stateless/01684_insert_specify_shard_id.reference create mode 100644 tests/queries/0_stateless/01684_insert_specify_shard_id.sql diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index a2cd65137c0..07df3f51546 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -537,7 +537,8 @@ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ - M(1002, UNKNOWN_EXCEPTION) + M(1002, UNKNOWN_EXCEPTION) \ + M(1003, INVALID_SHARD_ID) /* See END */ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c4cf3803913..6f30c4523e9 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -420,6 +420,7 @@ class IColumn; M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \ \ M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \ + M(UInt64, insert_shard_id, 0, "If non zero, when insert intoi a distributed table, the data will be inserted into the shard `insert_shard_id` synchronously. Possible values range from 1 to `shards_number` of corresponding distributed table", 0) \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \ diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index d21764bbb7d..c83133a55e6 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -383,11 +383,18 @@ void DistributedBlockOutputStream::writeSync(const Block & block) bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; size_t start = 0; size_t end = shards_info.size(); - if (random_shard_insert) + + if (settings.insert_shard_id) + { + start = settings.insert_shard_id - 1; + end = settings.insert_shard_id; + } + else if (random_shard_insert) { start = storage.getRandomShardIndex(shards_info); end = start + 1; } + size_t num_shards = end - start; if (!pool) @@ -545,7 +552,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block) } -void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id) +void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id) { const auto & shard_info = cluster->getShardsInfo()[shard_id]; const auto & settings = context.getSettingsRef(); diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.h b/src/Storages/Distributed/DistributedBlockOutputStream.h index ef37776893a..ca57ad46fbb 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -62,10 +62,10 @@ private: void writeSplitAsync(const Block & block); - void writeAsyncImpl(const Block & block, const size_t shard_id = 0); + void writeAsyncImpl(const Block & block, size_t shard_id = 0); /// Increments finished_writings_count after each repeat. - void writeToLocal(const Block & block, const size_t repeats); + void writeToLocal(const Block & block, size_t repeats); void writeToShard(const Block & block, const std::vector & dir_names); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index afd7d6b876e..a9d691d8c96 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -83,6 +83,7 @@ namespace ErrorCodes extern const int TYPE_MISMATCH; extern const int TOO_MANY_ROWS; extern const int UNABLE_TO_SKIP_UNUSED_SHARDS; + extern const int INVALID_SHARD_ID; } namespace ActionLocks @@ -541,22 +542,29 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta const auto & settings = context.getSettingsRef(); /// Ban an attempt to make async insert into the table belonging to DatabaseMemory - if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync) + if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync && !settings.insert_shard_id) { throw Exception("Storage " + getName() + " must have own data directory to enable asynchronous inserts", ErrorCodes::BAD_ARGUMENTS); } + auto shard_num = cluster->getLocalShardCount() + cluster->getRemoteShardCount(); + /// If sharding key is not specified, then you can only write to a shard containing only one shard - if (!settings.insert_distributed_one_random_shard && !has_sharding_key - && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2)) + if (!settings.insert_shard_id && !settings.insert_distributed_one_random_shard && !has_sharding_key && shard_num >= 2) { - throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided", - ErrorCodes::STORAGE_REQUIRES_PARAMETER); + throw Exception( + "Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided", + ErrorCodes::STORAGE_REQUIRES_PARAMETER); + } + + if (settings.insert_shard_id && settings.insert_shard_id > shard_num) + { + throw Exception("Shard id should be range from 1 to shard number", ErrorCodes::INVALID_SHARD_ID); } /// Force sync insertion if it is remote() table function - bool insert_sync = settings.insert_distributed_sync || owned_cluster; + bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster; auto timeout = settings.insert_distributed_timeout; /// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster diff --git a/tests/queries/0_stateless/01684_insert_specify_shard_id.reference b/tests/queries/0_stateless/01684_insert_specify_shard_id.reference new file mode 100644 index 00000000000..e542790e401 --- /dev/null +++ b/tests/queries/0_stateless/01684_insert_specify_shard_id.reference @@ -0,0 +1,40 @@ +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 diff --git a/tests/queries/0_stateless/01684_insert_specify_shard_id.sql b/tests/queries/0_stateless/01684_insert_specify_shard_id.sql new file mode 100644 index 00000000000..d4a3ae6a48d --- /dev/null +++ b/tests/queries/0_stateless/01684_insert_specify_shard_id.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS x; + +CREATE TABLE x AS system.numbers ENGINE = MergeTree ORDER BY number; + +CREATE TABLE x_dist as x ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), x); + +-- insert into first shard +INSERT INTO x_dist SELECT * FROM numbers(10) settings insert_shard_id = 1; + +-- insert into second shard +INSERT INTO x_dist SELECT * FROM numbers(10, 10) settings insert_shard_id = 2; + +-- invalid shard id +INSERT INTO x_dist SELECT * FROM numbers(10) settings insert_shard_id = 3; -- { serverError 1003 } + +SELECT * FROM remote('127.0.0.1', currentDatabase(), x); +SELECT * FROM remote('127.0.0.2', currentDatabase(), x); + +SELECT * FROM x_dist ORDER by number; + +DROP TABLE x; +DROP TABLE x_dist;