Fix random_shard_insert issue

This commit is contained in:
Amos Bird 2021-04-15 21:12:39 +08:00
parent 9b546f3b89
commit bf5b668f85
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
4 changed files with 62 additions and 37 deletions

View File

@ -108,6 +108,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth) if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
context->getClientInfo().distributed_depth += 1; context->getClientInfo().distributed_depth += 1;
random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
} }
@ -148,9 +149,6 @@ void DistributedBlockOutputStream::write(const Block & block)
void DistributedBlockOutputStream::writeAsync(const Block & block) void DistributedBlockOutputStream::writeAsync(const Block & block)
{ {
const Settings & settings = context->getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
if (random_shard_insert) if (random_shard_insert)
{ {
writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo())); writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo()));
@ -256,12 +254,20 @@ void DistributedBlockOutputStream::waitForJobs()
} }
} }
size_t jobs_count = remote_jobs_count + local_jobs_count;
size_t num_finished_jobs = finished_jobs_count; size_t num_finished_jobs = finished_jobs_count;
if (random_shard_insert)
{
if (finished_jobs_count != 1)
LOG_WARNING(log, "Expected 1 writing jobs when doing random shard insert, but finished {}", num_finished_jobs);
}
else
{
size_t jobs_count = remote_jobs_count + local_jobs_count;
if (num_finished_jobs < jobs_count) if (num_finished_jobs < jobs_count)
LOG_WARNING(log, "Expected {} writing jobs, but finished only {}", jobs_count, num_finished_jobs); LOG_WARNING(log, "Expected {} writing jobs, but finished only {}", jobs_count, num_finished_jobs);
} }
}
ThreadPool::Job ThreadPool::Job
@ -393,7 +399,6 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{ {
const Settings & settings = context->getSettingsRef(); const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo(); const auto & shards_info = cluster->getShardsInfo();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0; size_t start = 0;
size_t end = shards_info.size(); size_t end = shards_info.size();
@ -402,20 +407,13 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
start = settings.insert_shard_id - 1; start = settings.insert_shard_id - 1;
end = settings.insert_shard_id; end = settings.insert_shard_id;
} }
else if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (!pool) if (!pool)
{ {
/// Deferred initialization. Only for sync insertion. /// Deferred initialization. Only for sync insertion.
initWritingJobs(block, start, end); initWritingJobs(block, start, end);
pool.emplace(remote_jobs_count + local_jobs_count); pool.emplace(random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count));
if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes)) if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes))
{ {
@ -428,12 +426,20 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
watch_current_block.restart(); watch_current_block.restart();
if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (num_shards > 1) if (num_shards > 1)
{ {
auto current_selector = createSelector(block); auto current_selector = createSelector(block);
/// Prepare row numbers for each shard /// Prepare row numbers for needed shards
for (size_t shard_index : ext::range(0, num_shards)) for (size_t shard_index : ext::range(start, end))
per_shard_jobs[shard_index].shard_current_block_permutation.resize(0); per_shard_jobs[shard_index].shard_current_block_permutation.resize(0);
for (size_t i = 0; i < block.rows(); ++i) for (size_t i = 0; i < block.rows(); ++i)
@ -444,7 +450,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{ {
/// Run jobs in parallel for each block and wait them /// Run jobs in parallel for each block and wait them
finished_jobs_count = 0; finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size())) for (size_t shard_index : ext::range(start, end))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards)); pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
} }

View File

@ -93,6 +93,7 @@ private:
size_t inserted_rows = 0; size_t inserted_rows = 0;
bool insert_sync; bool insert_sync;
bool random_shard_insert;
/// Sync-related stuff /// Sync-related stuff
UInt64 insert_timeout; // in seconds UInt64 insert_timeout; // in seconds

View File

@ -1,8 +1,22 @@
0
0
1 1
1 1
2 0
1
2 2
3 3
3 4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

View File

@ -1,22 +1,26 @@
drop table if exists shard; create database if not exists shard_0;
create database if not exists shard_1;
drop table if exists shard_0.tbl;
drop table if exists shard_1.tbl;
drop table if exists distr; drop table if exists distr;
create table shard (id Int32) engine = MergeTree order by cityHash64(id); create table shard_0.tbl (number UInt64) engine = MergeTree order by number;
create table distr as shard engine Distributed (test_cluster_two_shards_localhost, currentDatabase(), shard); create table shard_1.tbl (number UInt64) engine = MergeTree order by number;
create table distr (number UInt64) engine = Distributed(test_cluster_two_shards_different_databases, '', tbl);
insert into distr (id) values (0), (1); -- { serverError 55; }
set insert_distributed_sync = 1; set insert_distributed_sync = 1;
insert into distr (id) values (0), (1); -- { serverError 55; }
set insert_distributed_sync = 0;
set insert_distributed_one_random_shard = 1; set insert_distributed_one_random_shard = 1;
set max_block_size = 1;
set max_insert_block_size = 1;
set min_insert_block_size_rows = 1;
insert into distr select number from numbers(20);
insert into distr (id) values (0), (1); select count() != 0 from shard_0.tbl;
insert into distr (id) values (2), (3); select count() != 0 from shard_1.tbl;
select * from distr order by number;
select * from distr order by id; drop table if exists shard_0.tbl;
drop table if exists shard_1.tbl;
drop table if exists shard; drop database shard_0;
drop table if exists distr; drop database shard_1;
drop table distr;