add setting insert_shard_id

add test

fix style

fix
This commit is contained in:
feng lv 2021-02-02 02:25:19 +00:00
parent c4a3acd4f8
commit 4279c7da41
7 changed files with 90 additions and 11 deletions

View File

@ -537,7 +537,8 @@
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \
M(1002, UNKNOWN_EXCEPTION)
M(1002, UNKNOWN_EXCEPTION) \
M(1003, INVALID_SHARD_ID)
/* See END */

View File

@ -420,6 +420,7 @@ class IColumn;
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
\
M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
M(UInt64, insert_shard_id, 0, "If non zero, when insert intoi a distributed table, the data will be inserted into the shard `insert_shard_id` synchronously. Possible values range from 1 to `shards_number` of corresponding distributed table", 0) \
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \

View File

@ -383,11 +383,18 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0;
size_t end = shards_info.size();
if (random_shard_insert)
if (settings.insert_shard_id)
{
start = settings.insert_shard_id - 1;
end = settings.insert_shard_id;
}
else if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (!pool)
@ -545,7 +552,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
}
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context.getSettingsRef();

View File

@ -62,10 +62,10 @@ private:
void writeSplitAsync(const Block & block);
void writeAsyncImpl(const Block & block, const size_t shard_id = 0);
void writeAsyncImpl(const Block & block, size_t shard_id = 0);
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, const size_t repeats);
void writeToLocal(const Block & block, size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);

View File

@ -83,6 +83,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int INVALID_SHARD_ID;
}
namespace ActionLocks
@ -541,22 +542,29 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
const auto & settings = context.getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync)
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync && !settings.insert_shard_id)
{
throw Exception("Storage " + getName() + " must have own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);
}
auto shard_num = cluster->getLocalShardCount() + cluster->getRemoteShardCount();
/// If sharding key is not specified, then you can only write to a shard containing only one shard
if (!settings.insert_distributed_one_random_shard && !has_sharding_key
&& ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
if (!settings.insert_shard_id && !settings.insert_distributed_one_random_shard && !has_sharding_key && shard_num >= 2)
{
throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
throw Exception(
"Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
}
if (settings.insert_shard_id && settings.insert_shard_id > shard_num)
{
throw Exception("Shard id should be range from 1 to shard number", ErrorCodes::INVALID_SHARD_ID);
}
/// Force sync insertion if it is remote() table function
bool insert_sync = settings.insert_distributed_sync || owned_cluster;
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster

View File

@ -0,0 +1,40 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS x;
CREATE TABLE x AS system.numbers ENGINE = MergeTree ORDER BY number;
CREATE TABLE x_dist as x ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), x);
-- insert into first shard
INSERT INTO x_dist SELECT * FROM numbers(10) settings insert_shard_id = 1;
-- insert into second shard
INSERT INTO x_dist SELECT * FROM numbers(10, 10) settings insert_shard_id = 2;
-- invalid shard id
INSERT INTO x_dist SELECT * FROM numbers(10) settings insert_shard_id = 3; -- { serverError 1003 }
SELECT * FROM remote('127.0.0.1', currentDatabase(), x);
SELECT * FROM remote('127.0.0.2', currentDatabase(), x);
SELECT * FROM x_dist ORDER by number;
DROP TABLE x;
DROP TABLE x_dist;